diff --git a/src/JT809.DotNetty.Core/Internal/JT809SubordinateLinkNotifyImplService.cs b/src/JT809.DotNetty.Core/Internal/JT809SubordinateLinkNotifyImplService.cs index d98ac9e..1f16051 100644 --- a/src/JT809.DotNetty.Core/Internal/JT809SubordinateLinkNotifyImplService.cs +++ b/src/JT809.DotNetty.Core/Internal/JT809SubordinateLinkNotifyImplService.cs @@ -3,6 +3,7 @@ using JT809.DotNetty.Core.Interfaces; using JT809.DotNetty.Core.Metadata; using JT809.DotNetty.Core.Session; using JT809.Protocol; +using JT809.Protocol.Configs; using JT809.Protocol.Enums; using JT809.Protocol.Extensions; using JT809.Protocol.MessageBody; @@ -20,15 +21,18 @@ namespace JT809.DotNetty.Core.Internal private readonly JT809SuperiorMainSessionManager jT809SuperiorMainSessionManager; private readonly ILogger logger; private readonly JT809Serializer JT809Serializer; + private readonly JT809HeaderOptions JT809HeaderOptions; public JT809SubordinateLinkNotifyImplService( + JT809HeaderOptions jT809HeaderOptions, ILoggerFactory loggerFactory, - IJT809Config jT809Config, + IJT809Config jT809Config, IOptions jT809ConfigurationAccessor, JT809SuperiorMainSessionManager jT809SuperiorMainSessionManager ) { JT809Serializer = jT809Config.GetSerializer(); + JT809HeaderOptions = jT809HeaderOptions; this.logger = loggerFactory.CreateLogger(); configuration = jT809ConfigurationAccessor.Value; this.jT809SuperiorMainSessionManager = jT809SuperiorMainSessionManager; @@ -36,28 +40,26 @@ namespace JT809.DotNetty.Core.Internal public void Notify(JT809_0x9007_ReasonCode reasonCode) { -#warning JT809GlobalConfig - //Notify(reasonCode, JT809GlobalConfig.Instance.HeaderOptions.MsgGNSSCENTERID); + Notify(reasonCode, JT809HeaderOptions.MsgGNSSCENTERID); } public void Notify(JT809_0x9007_ReasonCode reasonCode, uint msgGNSSCENTERID) { if (configuration.SubordinateClientEnable) { -#warning jT809SuperiorMainSessionManager - //var session = jT809SuperiorMainSessionManager.GetSession(JT809GlobalConfig.Instance.HeaderOptions.MsgGNSSCENTERID); - //if (session != null) - //{ - // //发送从链路注销请求 - // var package = JT809BusinessType.从链路断开通知消息.Create(new JT809_0x9007() - // { - // ReasonCode = reasonCode - // }); - // JT809Response jT809Response = new JT809Response(package, 100); - // if (logger.IsEnabled(LogLevel.Information)) - // logger.LogInformation($"从链路断开通知消息>>>{JT809Serializer.Serialize(package, 100).ToHexString()}"); - // session.Channel.WriteAndFlushAsync(jT809Response); - //} + var session = jT809SuperiorMainSessionManager.GetSession(msgGNSSCENTERID); + if (session != null) + { + //发送从链路注销请求 + var package = JT809BusinessType.从链路断开通知消息.Create(new JT809_0x9007() + { + ReasonCode = reasonCode + }); + JT809Response jT809Response = new JT809Response(package, 100); + if (logger.IsEnabled(LogLevel.Information)) + logger.LogInformation($"从链路断开通知消息>>>{JT809Serializer.Serialize(package, 100).ToHexString()}"); + session.Channel.WriteAndFlushAsync(jT809Response); + } } } } diff --git a/src/JT809.DotNetty.Simples/Inferior/JT809.Inferior.Client/JT809InferiorService.cs b/src/JT809.DotNetty.Simples/Inferior/JT809.Inferior.Client/JT809InferiorService.cs index 951574a..86f5423 100644 --- a/src/JT809.DotNetty.Simples/Inferior/JT809.Inferior.Client/JT809InferiorService.cs +++ b/src/JT809.DotNetty.Simples/Inferior/JT809.Inferior.Client/JT809InferiorService.cs @@ -11,6 +11,7 @@ using System.Threading.Tasks; using JT809.Protocol.SubMessageBody; using JT809.Protocol.Metadata; using JT809.Protocol.MessageBody; +using JT809.Protocol.Enums; namespace JT809.Inferior.Client { @@ -101,7 +102,7 @@ namespace JT809.Inferior.Client Vec3 = 12 } }; - var package = JT809.Protocol.Enums.JT809BusinessType.主链路车辆动态信息交换业务.Create(jT809_0X1200); + var package = JT809BusinessType.主链路车辆动态信息交换业务.Create(jT809_0X1200); mainClient.SendAsync(new JT809Response(package, 256)); logger.LogDebug($"Thread:{Thread.CurrentThread.ManagedThreadId}-4s"); Thread.Sleep(4000); diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.GpsConsumer/GpsConsumerService.cs b/src/JT809.DotNetty.Simples/Superior/JT809.GpsConsumer/GpsConsumerService.cs index 7f32ab2..fa62d1f 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.GpsConsumer/GpsConsumerService.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.GpsConsumer/GpsConsumerService.cs @@ -1,4 +1,5 @@ using JT809.GrpcProtos; +using JT809.KafkaService; using JT809.PubSub.Abstractions; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; @@ -13,13 +14,13 @@ namespace JT809.GpsConsumer { public class GpsConsumerService : IHostedService { - private readonly IJT808ConsumerOfT jT808Consumer; + private readonly JT809_GpsPosition_Consumer jT808Consumer; private readonly ILogger logger; public GpsConsumerService( ILoggerFactory loggerFactory, - IJT808ConsumerOfT jT808Consumer) + JT809_GpsPosition_Consumer jT808Consumer) { this.jT808Consumer = jT808Consumer; logger = loggerFactory.CreateLogger(); diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.GpsConsumer/Program.cs b/src/JT809.DotNetty.Simples/Superior/JT809.GpsConsumer/Program.cs index f178a8c..35f34fd 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.GpsConsumer/Program.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.GpsConsumer/Program.cs @@ -27,8 +27,7 @@ namespace JT809.GpsConsumer { services.AddSingleton(); services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); -#warning AddJT809KafkaConsumerPartitionsService - //services.AddJT809KafkaConsumerPartitionsService(hostContext.Configuration, options => options.Partition = 10); + services.AddJT809KafkaConsumerService(hostContext.Configuration); services.AddHostedService(); }); diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/Configs/JT809SameConsumerConfig.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/Configs/JT809SameConsumerConfig.cs new file mode 100644 index 0000000..facad38 --- /dev/null +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/Configs/JT809SameConsumerConfig.cs @@ -0,0 +1,15 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT809.KafkaService.Configs +{ + public class JT809SameConsumerConfig : ConsumerConfig, IOptions + { + public string TopicName { get; set; } + + public JT809SameConsumerConfig Value => this; + } +} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/Configs/JT809SameProducerConfig.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/Configs/JT809SameProducerConfig.cs new file mode 100644 index 0000000..a88d3bf --- /dev/null +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/Configs/JT809SameProducerConfig.cs @@ -0,0 +1,15 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT809.KafkaService.Configs +{ + public class JT809SameProducerConfig : ProducerConfig,IOptions + { + public string TopicName { get; set; } + + public JT809SameProducerConfig Value => this; + } +} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Consumer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Consumer.cs deleted file mode 100644 index 236a5c8..0000000 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Consumer.cs +++ /dev/null @@ -1,103 +0,0 @@ -using Confluent.Kafka; -using JT809.KafkaService.Configs; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; - -namespace JT809.KafkaService -{ - public abstract class JT809Consumer : JT809ConsumerBase - { - private bool _disposed = false; - public override CancellationTokenSource Cts { get; }= new CancellationTokenSource(); - - protected ILogger logger { get; } - - protected override IConsumer Consumer { get; } - - protected JT809Consumer( - IOptions consumerConfigAccessor, - ILoggerFactory loggerFactory) - : base(consumerConfigAccessor.Value) - { - logger = loggerFactory.CreateLogger("JT809Consumer"); - ConsumerBuilder consumerBuilder = new ConsumerBuilder(ConsumerConfig); - consumerBuilder.SetErrorHandler((consumer, error) => - { - logger.LogError(error.Reason); - }); - Consumer = consumerBuilder.Build(); - } - - public override void OnMessage(Action<(string MsgId, T Data)> callback) - { - Task.Run(() => - { - while (!Cts.IsCancellationRequested) - { - try - { - //如果不指定分区,根据kafka的机制会从多个分区中拉取数据 - //如果指定分区,根据kafka的机制会从相应的分区中拉取数据 - //consumers[n].Assign(topicPartitionList[n]); - var data = Consumer.Consume(Cts.Token); - if (logger.IsEnabled(LogLevel.Debug)) - { - logger.LogDebug($"Topic: {data.Topic} Key: {data.Message.Key} Partition: {data.Partition} Offset: {data.Offset} Data:{string.Join("", data.Message.Value)} TopicPartitionOffset:{data.TopicPartitionOffset}"); - } - callback((data.Message.Key, data.Message.Value)); - } - catch (ConsumeException ex) - { - logger.LogError(ex, ConsumerConfig.TopicName); - } - catch (Exception ex) - { - logger.LogError(ex, ConsumerConfig.TopicName); - } - } - }, Cts.Token); - } - - public override void Subscribe() - { - if (_disposed) return; - //仅有一个分区才需要订阅 - Consumer.Subscribe(ConsumerConfig.TopicName); - } - - public override void Unsubscribe() - { - if (_disposed) return; - Consumer.Unsubscribe(); - } - - public override void Dispose() - { - Dispose(true); - GC.SuppressFinalize(true); - - } - - ~JT809Consumer() - { - Dispose(false); - } - - protected virtual void Dispose(bool disposing) - { - if (_disposed) return; - if (disposing) - { - Cts.Cancel(); - Consumer.Close(); - Consumer.Dispose(); - Cts.Dispose(); - } - _disposed = true; - } - } -} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ConsumerBase.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ConsumerBase.cs deleted file mode 100644 index 5e64cea..0000000 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ConsumerBase.cs +++ /dev/null @@ -1,30 +0,0 @@ -using Confluent.Kafka; -using JT809.KafkaService.Configs; -using JT809.PubSub.Abstractions; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; - -namespace JT809.KafkaService -{ - public abstract class JT809ConsumerBase : IJT808ConsumerOfT - { - public JT809ConsumerConfig ConsumerConfig { get; } - - protected JT809ConsumerBase(IOptions config) - { - ConsumerConfig = config.Value; - } - - public abstract CancellationTokenSource Cts { get; } - protected abstract IConsumer Consumer { get; } - - public abstract void Dispose(); - public abstract void OnMessage(Action<(string MsgId, T Data)> callback); - public abstract void Subscribe(); - public abstract void Unsubscribe(); - } -} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809KafkaServiceExtensions.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809KafkaServiceExtensions.cs index 50ebc8e..67b270b 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809KafkaServiceExtensions.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809KafkaServiceExtensions.cs @@ -1,5 +1,6 @@ using Confluent.Kafka; using JT809.GrpcProtos; +using JT809.KafkaService.Configs; using JT809.PubSub.Abstractions; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; @@ -15,43 +16,33 @@ namespace JT809.KafkaService { public static IServiceCollection AddJT809KafkaProducerService(this IServiceCollection serviceDescriptors, IConfiguration configuration) { - serviceDescriptors.Configure(configuration.GetSection("KafkaProducerConfig")); - serviceDescriptors.AddSingleton(typeof(IJT809ProducerOfT), (service) => { - var producerConfig = service.GetRequiredService>(); -#warning JT809_Same_Producer - //return new JT809_Same_Producer(new JT809TopicOptions { TopicName = "jt809.same" }, producerConfig); - return null; - }); - serviceDescriptors.AddSingleton(typeof(IJT809ProducerOfT), (service) => { - var producerConfig = service.GetRequiredService>(); -#warning JT809_GpsPositio_Producer - //return new JT809_GpsPositio_Producer(new JT809TopicOptions { TopicName = "jt809.gps" }, producerConfig); - return null; - }); + serviceDescriptors.Configure(configuration.GetSection("JT809ProducerConfig")); + serviceDescriptors.AddSingleton(); return serviceDescriptors; } public static IServiceCollection AddJT809KafkaConsumerService(this IServiceCollection serviceDescriptors, IConfiguration configuration) { - serviceDescriptors.Configure(configuration.GetSection("KafkaConsumerConfig")); - serviceDescriptors.AddSingleton(typeof(IJT808ConsumerOfT), (service)=> { - var loggerFactory = service.GetRequiredService(); - var consumerConfig = service.GetRequiredService>(); - consumerConfig.Value.GroupId = "JT809.same.Test"; -#warning JT809_Same_Consumer - //return new JT809_Same_Consumer(new JT809TopicOptions { TopicName = "jt809.same" }, consumerConfig, loggerFactory); - return null; - }); - serviceDescriptors.AddSingleton(typeof(IJT808ConsumerOfT), (service) => { - var loggerFactory = service.GetRequiredService(); - var consumerConfig = service.GetRequiredService>(); - consumerConfig.Value.GroupId = "JT809.gps.Test"; -#warning JT809_GpsPosition_Consumer - //return new JT809_GpsPosition_Consumer(new JT809TopicOptions { TopicName = "jt809.gps" }, consumerConfig, loggerFactory); - return null; - }); + serviceDescriptors.Configure(configuration.GetSection("JT809ConsumerConfig")); + serviceDescriptors.AddSingleton(); return serviceDescriptors; } + + public static IServiceCollection AddJT809KafkaSameProducerService(this IServiceCollection serviceDescriptors, IConfiguration configuration) + { + serviceDescriptors.Configure(configuration.GetSection("JT809SameProducerConfig")); + serviceDescriptors.AddSingleton(); + return serviceDescriptors; + } + + + public static IServiceCollection AddJT809KafkaSameConsumerService(this IServiceCollection serviceDescriptors, IConfiguration configuration) + { + serviceDescriptors.Configure(configuration.GetSection("JT809SameConsumerConfig")); + serviceDescriptors.AddSingleton(); + return serviceDescriptors; + } + } } diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Producer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Producer.cs deleted file mode 100644 index 7268102..0000000 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Producer.cs +++ /dev/null @@ -1,67 +0,0 @@ -using Confluent.Kafka; -using Confluent.Kafka.Admin; -using JT809.KafkaService.Configs; -using JT809.PubSub.Abstractions; -using Microsoft.Extensions.Options; -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; - -namespace JT809.KafkaService -{ - /// - /// - /// - /// - public abstract class JT809Producer : JT809ProducerBase - { - private bool _disposed = false; - - protected virtual IProducer CreateProducer() - { - ProducerBuilder producerBuilder = new ProducerBuilder(ProducerConfig); - return producerBuilder.Build(); - } - - protected override IProducer Producer { get; } - - protected JT809Producer( - IOptions producerConfigAccessor) - : base(producerConfigAccessor.Value) - { - Producer = CreateProducer(); - } - - public override void Dispose() - { - Dispose(true); - GC.SuppressFinalize(true); - - } - - protected virtual void Dispose(bool disposing) - { - if (_disposed) return; - if (disposing) - { - Producer.Dispose(); - } - _disposed = true; - } - - public override async void ProduceAsync(string msgId, string vno_color, T data) - { - if (_disposed) return; - await Producer.ProduceAsync(ProducerConfig.TopicName, new Message - { - Key = msgId, - Value = data - }); - } - - ~JT809Producer() - { - Dispose(false); - } - } -} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ProducerBase.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ProducerBase.cs deleted file mode 100644 index afe9bba..0000000 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ProducerBase.cs +++ /dev/null @@ -1,23 +0,0 @@ -using Confluent.Kafka; -using JT809.KafkaService.Configs; -using JT809.PubSub.Abstractions; -using Microsoft.Extensions.Options; -using System; -using System.Collections.Generic; -using System.Text; - -namespace JT809.KafkaService -{ - public abstract class JT809ProducerBase : IJT809ProducerOfT - { - protected JT809ProducerBase(IOptions config) - { - ProducerConfig = config.Value; - } - - public JT809ProducerConfig ProducerConfig { get;} - protected abstract IProducer Producer { get;} - public abstract void Dispose(); - public abstract void ProduceAsync(string msgId, string vno_color, T data); - } -} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Producer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Producer.cs index ca3d6b7..4109771 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Producer.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Producer.cs @@ -4,13 +4,57 @@ using JT809.GrpcProtos; using JT809.KafkaService.Configs; using JT809.PubSub.Abstractions; using Microsoft.Extensions.Options; +using System; namespace JT809.KafkaService { - public sealed class JT809_GpsPositio_Producer : JT809Producer + public sealed class JT809_GpsPositio_Producer : IJT809Producer { - public JT809_GpsPositio_Producer(IOptions producerConfigAccessor) : base( producerConfigAccessor) + private readonly JT809ProducerConfig JT809ProducerConfig; + + private IProducer Producer; + + private bool _disposed = false; + + public JT809_GpsPositio_Producer(IOptions producerConfigAccessor) + { + JT809ProducerConfig = producerConfigAccessor.Value; + ProducerBuilder producerBuilder = new ProducerBuilder(producerConfigAccessor.Value); + Producer = producerBuilder.Build(); + TopicName = JT809ProducerConfig.TopicName; + } + + public string TopicName { get; } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(true); + } + + void Dispose(bool disposing) + { + if (_disposed) return; + if (disposing) + { + Producer.Dispose(); + } + _disposed = true; + } + + public async void ProduceAsync(string msgId, string vno_color, byte[] data) + { + if (_disposed) return; + await Producer.ProduceAsync(JT809ProducerConfig.TopicName, new Message + { + Key = msgId, + Value = data + }); + } + + ~JT809_GpsPositio_Producer() { + Dispose(false); } } } diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Consumer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Consumer.cs index 1e55c76..263b641 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Consumer.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Consumer.cs @@ -12,10 +12,97 @@ using System.Threading.Tasks; namespace JT809.KafkaService { - public sealed class JT809_GpsPosition_Consumer : JT809Consumer + public sealed class JT809_GpsPosition_Consumer : IJT809Consumer { - public JT809_GpsPosition_Consumer(IOptions consumerConfigAccessor, ILoggerFactory loggerFactory) : base( consumerConfigAccessor, loggerFactory) + private bool _disposed = false; + public CancellationTokenSource Cts { get; } = new CancellationTokenSource(); + private ILogger logger { get; } + public string TopicName { get; } + + private IConsumer Consumer; + private readonly JT809ConsumerConfig JT809ConsumerConfig; + + public JT809_GpsPosition_Consumer( + IOptions consumerConfigAccessor, + ILoggerFactory loggerFactory) + { + logger = loggerFactory.CreateLogger("JT809_GpsPosition_Consumer"); + JT809ConsumerConfig = consumerConfigAccessor.Value; + TopicName = JT809ConsumerConfig.TopicName; + ConsumerBuilder consumerBuilder = new ConsumerBuilder(consumerConfigAccessor.Value); + consumerBuilder.SetErrorHandler((consumer, error) => + { + logger.LogError(error.Reason); + }); + Consumer = consumerBuilder.Build(); + } + + public void OnMessage(Action<(string MsgId, byte[] Data)> callback) + { + Task.Run(() => + { + while (!Cts.IsCancellationRequested) + { + try + { + //如果不指定分区,根据kafka的机制会从多个分区中拉取数据 + //如果指定分区,根据kafka的机制会从相应的分区中拉取数据 + //consumers[n].Assign(topicPartitionList[n]); + var data = Consumer.Consume(Cts.Token); + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug($"Topic: {data.Topic} Key: {data.Message.Key} Partition: {data.Partition} Offset: {data.Offset} Data:{string.Join("", data.Message.Value)} TopicPartitionOffset:{data.TopicPartitionOffset}"); + } + callback((data.Message.Key, data.Message.Value)); + } + catch (ConsumeException ex) + { + logger.LogError(ex, JT809ConsumerConfig.TopicName); + } + catch (Exception ex) + { + logger.LogError(ex, JT809ConsumerConfig.TopicName); + } + } + }, Cts.Token); + } + + public void Subscribe() + { + if (_disposed) return; + //仅有一个分区才需要订阅 + Consumer.Subscribe(JT809ConsumerConfig.TopicName); + } + + public void Unsubscribe() + { + if (_disposed) return; + Consumer.Unsubscribe(); + } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(true); + + } + + ~JT809_GpsPosition_Consumer() + { + Dispose(false); + } + + void Dispose(bool disposing) { + if (_disposed) return; + if (disposing) + { + Cts.Cancel(); + Consumer.Close(); + Consumer.Dispose(); + Cts.Dispose(); + } + _disposed = true; } } } diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Consumer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Consumer.cs index 956eb73..a4db69f 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Consumer.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Consumer.cs @@ -12,11 +12,97 @@ using System.Threading.Tasks; namespace JT809.KafkaService { - public sealed class JT809_Same_Consumer : JT809Consumer + public sealed class JT809_Same_Consumer : IJT809Consumer { - public JT809_Same_Consumer(IOptions consumerConfigAccessor, ILoggerFactory loggerFactory) - : base(consumerConfigAccessor, loggerFactory) + private bool _disposed = false; + public CancellationTokenSource Cts { get; } = new CancellationTokenSource(); + private ILogger logger { get; } + public string TopicName { get; } + + private IConsumer Consumer; + private readonly JT809SameConsumerConfig JT809ConsumerConfig; + + public JT809_Same_Consumer( + IOptions consumerConfigAccessor, + ILoggerFactory loggerFactory) + { + logger = loggerFactory.CreateLogger("JT809_Same_Consumer"); + JT809ConsumerConfig = consumerConfigAccessor.Value; + TopicName = JT809ConsumerConfig.TopicName; + ConsumerBuilder consumerBuilder = new ConsumerBuilder(consumerConfigAccessor.Value); + consumerBuilder.SetErrorHandler((consumer, error) => + { + logger.LogError(error.Reason); + }); + Consumer = consumerBuilder.Build(); + } + + public void OnMessage(Action<(string MsgId, byte[] Data)> callback) + { + Task.Run(() => + { + while (!Cts.IsCancellationRequested) + { + try + { + //如果不指定分区,根据kafka的机制会从多个分区中拉取数据 + //如果指定分区,根据kafka的机制会从相应的分区中拉取数据 + //consumers[n].Assign(topicPartitionList[n]); + var data = Consumer.Consume(Cts.Token); + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug($"Topic: {data.Topic} Key: {data.Message.Key} Partition: {data.Partition} Offset: {data.Offset} Data:{string.Join("", data.Message.Value)} TopicPartitionOffset:{data.TopicPartitionOffset}"); + } + callback((data.Message.Key, data.Message.Value)); + } + catch (ConsumeException ex) + { + logger.LogError(ex, JT809ConsumerConfig.TopicName); + } + catch (Exception ex) + { + logger.LogError(ex, JT809ConsumerConfig.TopicName); + } + } + }, Cts.Token); + } + + public void Subscribe() + { + if (_disposed) return; + //仅有一个分区才需要订阅 + Consumer.Subscribe(JT809ConsumerConfig.TopicName); + } + + public void Unsubscribe() + { + if (_disposed) return; + Consumer.Unsubscribe(); + } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(true); + + } + + ~JT809_Same_Consumer() + { + Dispose(false); + } + + void Dispose(bool disposing) { + if (_disposed) return; + if (disposing) + { + Cts.Cancel(); + Consumer.Close(); + Consumer.Dispose(); + Cts.Dispose(); + } + _disposed = true; } } } diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Producer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Producer.cs index 50a58d2..a73c729 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Producer.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Producer.cs @@ -8,11 +8,53 @@ using Microsoft.Extensions.Options; namespace JT809.KafkaService { - public sealed class JT809_Same_Producer : JT809Producer + public sealed class JT809_Same_Producer : IJT809Producer { - public JT809_Same_Producer(IOptions producerConfigAccessor) - : base(producerConfigAccessor) + private readonly JT809SameProducerConfig JT809SameProducerConfig; + + private IProducer Producer; + + private bool _disposed = false; + + public JT809_Same_Producer(IOptions producerConfigAccessor) + { + JT809SameProducerConfig = producerConfigAccessor.Value; + ProducerBuilder producerBuilder = new ProducerBuilder(producerConfigAccessor.Value); + Producer= producerBuilder.Build(); + TopicName = JT809SameProducerConfig.Value.TopicName; + } + + public string TopicName { get; } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(true); + } + + void Dispose(bool disposing) + { + if (_disposed) return; + if (disposing) + { + Producer.Dispose(); + } + _disposed = true; + } + + public async void ProduceAsync(string msgId, string vno_color, byte[] data) + { + if (_disposed) return; + await Producer.ProduceAsync(JT809SameProducerConfig.TopicName, new Message + { + Key = msgId, + Value = data + }); + } + + ~JT809_Same_Producer() { + Dispose(false); } } } diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTest.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTest.cs index 51f51dd..b6690d3 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTest.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTest.cs @@ -4,6 +4,8 @@ using Microsoft.Extensions.DependencyInjection; using JT809.Protocol.Enums; using JT809.Protocol.Extensions; using System.Threading; +using Google.Protobuf; +using JT809.GrpcProtos; namespace JT809.KafkaServiceTest { @@ -16,9 +18,10 @@ namespace JT809.KafkaServiceTest consumerTestService.GpsConsumer.OnMessage((Message)=> { Assert.Equal(JT809SubBusinessType.实时上传车辆定位信息.ToValueString(), Message.MsgId); - Assert.Equal("粤A23456", Message.Data.Vno); - Assert.Equal(2, Message.Data.VColor); - Assert.Equal("smallchi", Message.Data.FromChannel); + JT809GpsPosition jT809GpsPosition = JT809GpsPosition.Parser.ParseFrom(Message.Data); + Assert.Equal("粤A23456", jT809GpsPosition.Vno); + Assert.Equal(2, jT809GpsPosition.VColor); + Assert.Equal("smallchi", jT809GpsPosition.FromChannel); }); consumerTestService.GpsConsumer.Subscribe(); diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTestService.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTestService.cs index f657756..14e7c16 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTestService.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTestService.cs @@ -9,9 +9,9 @@ namespace JT809.KafkaServiceTest { public class ConsumerTestService { - public IJT808ConsumerOfT SameConsumer { get; } - public IJT808ConsumerOfT GpsConsumer { get; } - public ConsumerTestService(IJT808ConsumerOfT sameConsumer, IJT808ConsumerOfT gpsConsumer) + public JT809_Same_Consumer SameConsumer { get; } + public JT809_GpsPosition_Consumer GpsConsumer { get; } + public ConsumerTestService(JT809_Same_Consumer sameConsumer, JT809_GpsPosition_Consumer gpsConsumer) { SameConsumer = sameConsumer; GpsConsumer = gpsConsumer; diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTest.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTest.cs index efbcf61..0891e5e 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTest.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTest.cs @@ -3,6 +3,7 @@ using Xunit; using Microsoft.Extensions.DependencyInjection; using JT809.Protocol.Enums; using JT809.Protocol.Extensions; +using Google.Protobuf; namespace JT809.KafkaServiceTest { @@ -12,19 +13,21 @@ namespace JT809.KafkaServiceTest public void Test1() { ProducerTestService producerTestService = ServiceProvider.GetRequiredService(); - producerTestService.GpsProducer.ProduceAsync(JT809SubBusinessType.ʵʱϴλϢ.ToValueString(), "A23456_2", new GrpcProtos.JT809GpsPosition - { - Vno= "A23456", - VColor=2, - GpsTime= (DateTime.Now.ToUniversalTime().Ticks - 621355968000000000) / 10000000, - FromChannel="smallchi" - }); + + producerTestService.GpsProducer.ProduceAsync(JT809SubBusinessType.ʵʱϴλϢ.ToValueString(), "A23456_2", + new GrpcProtos.JT809GpsPosition + { + Vno = "A23456", + VColor = 2, + GpsTime = (DateTime.Now.ToUniversalTime().Ticks - 621355968000000000) / 10000000, + FromChannel = "smallchi" + }.ToByteArray()); } [Fact] public void Test2() { - ProducerTestService producerTestService = ServiceProvider.GetRequiredService(); + ProducerTestService producerTestService = ServiceProvider.GetRequiredService(); producerTestService.SameProducer.ProduceAsync(JT809SubBusinessType.None.ToValueString(), "A23457_2", new byte[] { 0x01, 0x02, 0x03 }); } } diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTestService.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTestService.cs index ff04f26..7e63c31 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTestService.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTestService.cs @@ -9,9 +9,9 @@ namespace JT809.KafkaServiceTest { public class ProducerTestService { - public IJT809ProducerOfT SameProducer { get; } - public IJT809ProducerOfT GpsProducer { get; } - public ProducerTestService(IJT809ProducerOfT sameProducer, IJT809ProducerOfT gpsProducer) + public JT809_Same_Producer SameProducer { get; } + public JT809_GpsPositio_Producer GpsProducer { get; } + public ProducerTestService(JT809_Same_Producer sameProducer, JT809_GpsPositio_Producer gpsProducer) { SameProducer = sameProducer; GpsProducer = gpsProducer; diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809Consumer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809Consumer.cs index 7d111e2..8b413f0 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809Consumer.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809Consumer.cs @@ -5,13 +5,9 @@ using System.Threading; namespace JT809.PubSub.Abstractions { - public interface IJT809Consumer : IJT809PubSub, IJT808ConsumerOfT + public interface IJT809Consumer : IJT809PubSub { - - } - public interface IJT808ConsumerOfT :IDisposable - { - void OnMessage(Action<(string MsgId, T Data)> callback); + void OnMessage(Action<(string MsgId, byte[] Data)> callback); CancellationTokenSource Cts { get; } void Subscribe(); void Unsubscribe(); diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809Producer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809Producer.cs index 6a79338..be22a94 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809Producer.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809Producer.cs @@ -3,12 +3,7 @@ using System.Threading.Tasks; namespace JT809.PubSub.Abstractions { - public interface IJT809Producer: IJT809PubSub, IJT809ProducerOfT - { - - } - - public interface IJT809ProducerOfT: IDisposable + public interface IJT809Producer: IJT809PubSub, IDisposable { /// /// @@ -16,6 +11,6 @@ namespace JT809.PubSub.Abstractions /// 消息Id /// 车牌号+车牌颜色 /// hex data - void ProduceAsync(string msgId, string vno_color, T data); + void ProduceAsync(string msgId, string vno_color, byte[] data); } } diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809ProducerPartitionFactory.cs b/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809ProducerPartitionFactory.cs deleted file mode 100644 index bdb7639..0000000 --- a/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809ProducerPartitionFactory.cs +++ /dev/null @@ -1,17 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace JT809.PubSub.Abstractions -{ - /// - /// jt809生产者分区工厂 - /// 分区策略: - /// 1.可以根据(车牌号+颜色)进行分区 - /// 2.可以根据msgId(消息Id)+(车牌号+颜色)进行分区 - /// - public interface IJT809ProducerPartitionFactory - { - int CreatePartition(string topicName, string msgId, string vno_color); - } -} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809HashAlgorithm.cs b/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809HashAlgorithm.cs deleted file mode 100644 index 57fb2e1..0000000 --- a/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809HashAlgorithm.cs +++ /dev/null @@ -1,35 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Security.Cryptography; - -namespace JT809.PubSub.Abstractions -{ - public class JT809HashAlgorithm - { - /// - /// 使用Ketama - /// - /// - /// - /// - public static long Hash(byte[] digest, int nTime=1) - { - long rv = ((long)(digest[3 + nTime * 4] & 0xFF) << 24) - | ((long)(digest[2 + nTime * 4] & 0xFF) << 16) - | ((long)(digest[1 + nTime * 4] & 0xFF) << 8) - | ((long)digest[0 + nTime * 4] & 0xFF); - return rv & 0xffffffffL; - } - - public static byte[] ComputeMd5(string key) - { - using (MD5 md5 = new MD5CryptoServiceProvider()) - { - byte[] keyBytes = md5.ComputeHash(Encoding.UTF8.GetBytes(key)); - md5.Clear(); - return keyBytes; - } - } - } -} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809PartitionOptions.cs b/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809PartitionOptions.cs deleted file mode 100644 index 27e1046..0000000 --- a/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809PartitionOptions.cs +++ /dev/null @@ -1,16 +0,0 @@ -using Microsoft.Extensions.Options; -using System; -using System.Collections.Generic; -using System.Text; - -namespace JT809.PubSub.Abstractions -{ - public class JT809PartitionOptions:IOptions - { - public int Partition { get; set; } = 1; - - public JT809PartitionOptions Value => this; - - public List AssignPartitions { get; set; } - } -} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809TopicOptions.cs b/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809TopicOptions.cs deleted file mode 100644 index aed8ded..0000000 --- a/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809TopicOptions.cs +++ /dev/null @@ -1,14 +0,0 @@ -using Microsoft.Extensions.Options; -using System; -using System.Collections.Generic; -using System.Text; - -namespace JT809.PubSub.Abstractions -{ - public class JT809TopicOptions:IOptions - { - public string TopicName { get; set; } = JT809Constants.JT809TopicName; - - public JT809TopicOptions Value => this; - } -} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.Superior.Server/JT809SuperiorMsgIdReceiveHandler.cs b/src/JT809.DotNetty.Simples/Superior/JT809.Superior.Server/JT809SuperiorMsgIdReceiveHandler.cs index 4ef3f99..367636f 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.Superior.Server/JT809SuperiorMsgIdReceiveHandler.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.Superior.Server/JT809SuperiorMsgIdReceiveHandler.cs @@ -1,7 +1,9 @@ -using JT809.DotNetty.Core.Handlers; +using Google.Protobuf; +using JT809.DotNetty.Core.Handlers; using JT809.DotNetty.Core.Interfaces; using JT809.DotNetty.Core.Metadata; using JT809.GrpcProtos; +using JT809.KafkaService; using JT809.Protocol; using JT809.Protocol.SubMessageBody; using JT809.PubSub.Abstractions; @@ -16,11 +18,11 @@ namespace JT809.Superior.Server { public sealed class JT809SuperiorMsgIdReceiveHandler : JT809SuperiorMsgIdReceiveHandlerBase { - private readonly IJT809ProducerOfT producer; + private readonly JT809_GpsPositio_Producer producer; private readonly JT809GpsOptions gpsOptions; public JT809SuperiorMsgIdReceiveHandler( IOptionsjt809GpsAccessor, - IJT809ProducerOfT producer, + JT809_GpsPositio_Producer producer, ILoggerFactory loggerFactory, IJT809SubordinateLoginService jT809SubordinateLoginService, IJT809VerifyCodeGenerator verifyCodeGenerator) @@ -55,7 +57,7 @@ namespace JT809.Superior.Server gpsBodies.VehiclePosition.Minute, gpsBodies.VehiclePosition.Second).ToUniversalTime().Ticks - 621355968000000000) / 10000000; gpsPosition.FromChannel = gpsOptions.FromChannel; - producer.ProduceAsync($"{0x1202}", $"{exchangeMessageBodies.VehicleNo}{exchangeMessageBodies.VehicleColor}", gpsPosition); + producer.ProduceAsync($"{0x1202}", $"{exchangeMessageBodies.VehicleNo}{exchangeMessageBodies.VehicleColor}", gpsPosition.ToByteArray()); return base.Msg0x1200_0x1202(request); } } diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.Superior.Server/Program.cs b/src/JT809.DotNetty.Simples/Superior/JT809.Superior.Server/Program.cs index c30148b..32d0dec 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.Superior.Server/Program.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.Superior.Server/Program.cs @@ -40,8 +40,7 @@ namespace JT809.Superior.Server options.TcpPort = 808; }); services.Configure(hostContext.Configuration.GetSection("JT809GpsOptions")); -#warning AddJT809KafkaProducerPartitionsService - //services.AddJT809KafkaProducerPartitionsService(hostContext.Configuration,options=> options.Partition=10); + services.AddJT809KafkaProducerService(hostContext.Configuration); services.Replace(new ServiceDescriptor(typeof(JT809SuperiorMsgIdReceiveHandlerBase), typeof(JT809SuperiorMsgIdReceiveHandler), ServiceLifetime.Singleton)); }); diff --git a/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/Program.cs b/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/Program.cs index 81f9788..514a0d6 100644 --- a/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/Program.cs +++ b/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/Program.cs @@ -1,5 +1,6 @@ using JT809.DotNetty.Core; using JT809.DotNetty.Core.Configurations; +using JT809.Protocol; using JT809.Protocol.Configs; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; @@ -14,15 +15,6 @@ namespace JT809.DotNetty.Host.Test { static async Task Main(string[] args) { -#warning JT809.Protocol.JT809GlobalConfig.Instance - //JT809.Protocol.JT809GlobalConfig.Instance - // .SetHeaderOptions(new JT809HeaderOptions - // { - // MsgGNSSCENTERID = 20141013, - // Version = new JT809.Protocol.JT809Header_Version(1, 0, 0), - // EncryptKey = 9595 - // }); - //主链路登录请求消息 //5B00000048000000851001013353D5010000000000270F0133530D32303134303831333132372E302E302E3100000000000000000000000000000000000000000000001FA3275F5D //主链路注销请求消息 @@ -45,6 +37,12 @@ namespace JT809.DotNetty.Host.Test { services.AddSingleton(); services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); + services.AddSingleton(new JT809HeaderOptions + { + MsgGNSSCENTERID = 20141013, + Version = new JT809Header_Version(1, 0, 0), + EncryptKey = 9595 + }); services.AddJT809Core(hostContext.Configuration) .AddJT809SuperiorPlatform(options:options=> { options.TcpPort = 839;