Przeglądaj źródła

修改kafka服务

master
SmallChi(Koike) 5 lat temu
rodzic
commit
402a875413
28 zmienionych plików z 386 dodań i 412 usunięć
  1. +19
    -17
      src/JT809.DotNetty.Core/Internal/JT809SubordinateLinkNotifyImplService.cs
  2. +2
    -1
      src/JT809.DotNetty.Simples/Inferior/JT809.Inferior.Client/JT809InferiorService.cs
  3. +3
    -2
      src/JT809.DotNetty.Simples/Superior/JT809.GpsConsumer/GpsConsumerService.cs
  4. +1
    -2
      src/JT809.DotNetty.Simples/Superior/JT809.GpsConsumer/Program.cs
  5. +15
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/Configs/JT809SameConsumerConfig.cs
  6. +15
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/Configs/JT809SameProducerConfig.cs
  7. +0
    -103
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Consumer.cs
  8. +0
    -30
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ConsumerBase.cs
  9. +21
    -30
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809KafkaServiceExtensions.cs
  10. +0
    -67
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Producer.cs
  11. +0
    -23
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ProducerBase.cs
  12. +46
    -2
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Producer.cs
  13. +89
    -2
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Consumer.cs
  14. +89
    -3
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Consumer.cs
  15. +45
    -3
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Producer.cs
  16. +6
    -3
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTest.cs
  17. +3
    -3
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTestService.cs
  18. +11
    -8
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTest.cs
  19. +3
    -3
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTestService.cs
  20. +2
    -6
      src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809Consumer.cs
  21. +2
    -7
      src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809Producer.cs
  22. +0
    -17
      src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809ProducerPartitionFactory.cs
  23. +0
    -35
      src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809HashAlgorithm.cs
  24. +0
    -16
      src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809PartitionOptions.cs
  25. +0
    -14
      src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809TopicOptions.cs
  26. +6
    -4
      src/JT809.DotNetty.Simples/Superior/JT809.Superior.Server/JT809SuperiorMsgIdReceiveHandler.cs
  27. +1
    -2
      src/JT809.DotNetty.Simples/Superior/JT809.Superior.Server/Program.cs
  28. +7
    -9
      src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/Program.cs

+ 19
- 17
src/JT809.DotNetty.Core/Internal/JT809SubordinateLinkNotifyImplService.cs Wyświetl plik

@@ -3,6 +3,7 @@ using JT809.DotNetty.Core.Interfaces;
using JT809.DotNetty.Core.Metadata;
using JT809.DotNetty.Core.Session;
using JT809.Protocol;
using JT809.Protocol.Configs;
using JT809.Protocol.Enums;
using JT809.Protocol.Extensions;
using JT809.Protocol.MessageBody;
@@ -20,15 +21,18 @@ namespace JT809.DotNetty.Core.Internal
private readonly JT809SuperiorMainSessionManager jT809SuperiorMainSessionManager;
private readonly ILogger logger;
private readonly JT809Serializer JT809Serializer;
private readonly JT809HeaderOptions JT809HeaderOptions;

public JT809SubordinateLinkNotifyImplService(
JT809HeaderOptions jT809HeaderOptions,
ILoggerFactory loggerFactory,
IJT809Config jT809Config,
IJT809Config jT809Config,
IOptions<JT809Configuration> jT809ConfigurationAccessor,
JT809SuperiorMainSessionManager jT809SuperiorMainSessionManager
)
{
JT809Serializer = jT809Config.GetSerializer();
JT809HeaderOptions = jT809HeaderOptions;
this.logger = loggerFactory.CreateLogger<JT809SubordinateLinkNotifyImplService>();
configuration = jT809ConfigurationAccessor.Value;
this.jT809SuperiorMainSessionManager = jT809SuperiorMainSessionManager;
@@ -36,28 +40,26 @@ namespace JT809.DotNetty.Core.Internal

public void Notify(JT809_0x9007_ReasonCode reasonCode)
{
#warning JT809GlobalConfig
//Notify(reasonCode, JT809GlobalConfig.Instance.HeaderOptions.MsgGNSSCENTERID);
Notify(reasonCode, JT809HeaderOptions.MsgGNSSCENTERID);
}

public void Notify(JT809_0x9007_ReasonCode reasonCode, uint msgGNSSCENTERID)
{
if (configuration.SubordinateClientEnable)
{
#warning jT809SuperiorMainSessionManager
//var session = jT809SuperiorMainSessionManager.GetSession(JT809GlobalConfig.Instance.HeaderOptions.MsgGNSSCENTERID);
//if (session != null)
//{
// //发送从链路注销请求
// var package = JT809BusinessType.从链路断开通知消息.Create(new JT809_0x9007()
// {
// ReasonCode = reasonCode
// });
// JT809Response jT809Response = new JT809Response(package, 100);
// if (logger.IsEnabled(LogLevel.Information))
// logger.LogInformation($"从链路断开通知消息>>>{JT809Serializer.Serialize(package, 100).ToHexString()}");
// session.Channel.WriteAndFlushAsync(jT809Response);
//}
var session = jT809SuperiorMainSessionManager.GetSession(msgGNSSCENTERID);
if (session != null)
{
//发送从链路注销请求
var package = JT809BusinessType.从链路断开通知消息.Create(new JT809_0x9007()
{
ReasonCode = reasonCode
});
JT809Response jT809Response = new JT809Response(package, 100);
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation($"从链路断开通知消息>>>{JT809Serializer.Serialize(package, 100).ToHexString()}");
session.Channel.WriteAndFlushAsync(jT809Response);
}
}
}
}


+ 2
- 1
src/JT809.DotNetty.Simples/Inferior/JT809.Inferior.Client/JT809InferiorService.cs Wyświetl plik

@@ -11,6 +11,7 @@ using System.Threading.Tasks;
using JT809.Protocol.SubMessageBody;
using JT809.Protocol.Metadata;
using JT809.Protocol.MessageBody;
using JT809.Protocol.Enums;

namespace JT809.Inferior.Client
{
@@ -101,7 +102,7 @@ namespace JT809.Inferior.Client
Vec3 = 12
}
};
var package = JT809.Protocol.Enums.JT809BusinessType.主链路车辆动态信息交换业务.Create(jT809_0X1200);
var package = JT809BusinessType.主链路车辆动态信息交换业务.Create(jT809_0X1200);
mainClient.SendAsync(new JT809Response(package, 256));
logger.LogDebug($"Thread:{Thread.CurrentThread.ManagedThreadId}-4s");
Thread.Sleep(4000);


+ 3
- 2
src/JT809.DotNetty.Simples/Superior/JT809.GpsConsumer/GpsConsumerService.cs Wyświetl plik

@@ -1,4 +1,5 @@
using JT809.GrpcProtos;
using JT809.KafkaService;
using JT809.PubSub.Abstractions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
@@ -13,13 +14,13 @@ namespace JT809.GpsConsumer
{
public class GpsConsumerService : IHostedService
{
private readonly IJT808ConsumerOfT<JT809GpsPosition> jT808Consumer;
private readonly JT809_GpsPosition_Consumer jT808Consumer;

private readonly ILogger logger;

public GpsConsumerService(
ILoggerFactory loggerFactory,
IJT808ConsumerOfT<JT809GpsPosition> jT808Consumer)
JT809_GpsPosition_Consumer jT808Consumer)
{
this.jT808Consumer = jT808Consumer;
logger = loggerFactory.CreateLogger<GpsConsumerService>();


+ 1
- 2
src/JT809.DotNetty.Simples/Superior/JT809.GpsConsumer/Program.cs Wyświetl plik

@@ -27,8 +27,7 @@ namespace JT809.GpsConsumer
{
services.AddSingleton<ILoggerFactory, LoggerFactory>();
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
#warning AddJT809KafkaConsumerPartitionsService
//services.AddJT809KafkaConsumerPartitionsService(hostContext.Configuration, options => options.Partition = 10);
services.AddJT809KafkaConsumerService(hostContext.Configuration);
services.AddHostedService<GpsConsumerService>();
});



+ 15
- 0
src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/Configs/JT809SameConsumerConfig.cs Wyświetl plik

@@ -0,0 +1,15 @@
using Confluent.Kafka;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT809.KafkaService.Configs
{
public class JT809SameConsumerConfig : ConsumerConfig, IOptions<JT809SameConsumerConfig>
{
public string TopicName { get; set; }

public JT809SameConsumerConfig Value => this;
}
}

+ 15
- 0
src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/Configs/JT809SameProducerConfig.cs Wyświetl plik

@@ -0,0 +1,15 @@
using Confluent.Kafka;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT809.KafkaService.Configs
{
public class JT809SameProducerConfig : ProducerConfig,IOptions<JT809SameProducerConfig>
{
public string TopicName { get; set; }

public JT809SameProducerConfig Value => this;
}
}

+ 0
- 103
src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Consumer.cs Wyświetl plik

@@ -1,103 +0,0 @@
using Confluent.Kafka;
using JT809.KafkaService.Configs;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace JT809.KafkaService
{
public abstract class JT809Consumer<T> : JT809ConsumerBase<T>
{
private bool _disposed = false;
public override CancellationTokenSource Cts { get; }= new CancellationTokenSource();

protected ILogger logger { get; }

protected override IConsumer<string, T> Consumer { get; }

protected JT809Consumer(
IOptions<JT809ConsumerConfig> consumerConfigAccessor,
ILoggerFactory loggerFactory)
: base(consumerConfigAccessor.Value)
{
logger = loggerFactory.CreateLogger("JT809Consumer");
ConsumerBuilder<string, T> consumerBuilder = new ConsumerBuilder<string, T>(ConsumerConfig);
consumerBuilder.SetErrorHandler((consumer, error) =>
{
logger.LogError(error.Reason);
});
Consumer = consumerBuilder.Build();
}

public override void OnMessage(Action<(string MsgId, T Data)> callback)
{
Task.Run(() =>
{
while (!Cts.IsCancellationRequested)
{
try
{
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据
//如果指定分区,根据kafka的机制会从相应的分区中拉取数据
//consumers[n].Assign(topicPartitionList[n]);
var data = Consumer.Consume(Cts.Token);
if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogDebug($"Topic: {data.Topic} Key: {data.Message.Key} Partition: {data.Partition} Offset: {data.Offset} Data:{string.Join("", data.Message.Value)} TopicPartitionOffset:{data.TopicPartitionOffset}");
}
callback((data.Message.Key, data.Message.Value));
}
catch (ConsumeException ex)
{
logger.LogError(ex, ConsumerConfig.TopicName);
}
catch (Exception ex)
{
logger.LogError(ex, ConsumerConfig.TopicName);
}
}
}, Cts.Token);
}

public override void Subscribe()
{
if (_disposed) return;
//仅有一个分区才需要订阅
Consumer.Subscribe(ConsumerConfig.TopicName);
}

public override void Unsubscribe()
{
if (_disposed) return;
Consumer.Unsubscribe();
}

public override void Dispose()
{
Dispose(true);
GC.SuppressFinalize(true);

}

~JT809Consumer()
{
Dispose(false);
}

protected virtual void Dispose(bool disposing)
{
if (_disposed) return;
if (disposing)
{
Cts.Cancel();
Consumer.Close();
Consumer.Dispose();
Cts.Dispose();
}
_disposed = true;
}
}
}

+ 0
- 30
src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ConsumerBase.cs Wyświetl plik

@@ -1,30 +0,0 @@
using Confluent.Kafka;
using JT809.KafkaService.Configs;
using JT809.PubSub.Abstractions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace JT809.KafkaService
{
public abstract class JT809ConsumerBase<T> : IJT808ConsumerOfT<T>
{
public JT809ConsumerConfig ConsumerConfig { get; }

protected JT809ConsumerBase(IOptions<JT809ConsumerConfig> config)
{
ConsumerConfig = config.Value;
}

public abstract CancellationTokenSource Cts { get; }
protected abstract IConsumer<string, T> Consumer { get; }

public abstract void Dispose();
public abstract void OnMessage(Action<(string MsgId, T Data)> callback);
public abstract void Subscribe();
public abstract void Unsubscribe();
}
}

+ 21
- 30
src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809KafkaServiceExtensions.cs Wyświetl plik

@@ -1,5 +1,6 @@
using Confluent.Kafka;
using JT809.GrpcProtos;
using JT809.KafkaService.Configs;
using JT809.PubSub.Abstractions;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
@@ -15,43 +16,33 @@ namespace JT809.KafkaService
{
public static IServiceCollection AddJT809KafkaProducerService(this IServiceCollection serviceDescriptors, IConfiguration configuration)
{
serviceDescriptors.Configure<ProducerConfig>(configuration.GetSection("KafkaProducerConfig"));
serviceDescriptors.AddSingleton(typeof(IJT809ProducerOfT<byte[]>), (service) => {
var producerConfig = service.GetRequiredService<IOptions<ProducerConfig>>();
#warning JT809_Same_Producer
//return new JT809_Same_Producer(new JT809TopicOptions { TopicName = "jt809.same" }, producerConfig);
return null;
});
serviceDescriptors.AddSingleton(typeof(IJT809ProducerOfT<JT809GpsPosition>), (service) => {
var producerConfig = service.GetRequiredService<IOptions<ProducerConfig>>();
#warning JT809_GpsPositio_Producer
//return new JT809_GpsPositio_Producer(new JT809TopicOptions { TopicName = "jt809.gps" }, producerConfig);
return null;
});
serviceDescriptors.Configure<JT809ProducerConfig>(configuration.GetSection("JT809ProducerConfig"));
serviceDescriptors.AddSingleton<JT809_GpsPositio_Producer>();
return serviceDescriptors;
}


public static IServiceCollection AddJT809KafkaConsumerService(this IServiceCollection serviceDescriptors, IConfiguration configuration)
{
serviceDescriptors.Configure<ConsumerConfig>(configuration.GetSection("KafkaConsumerConfig"));
serviceDescriptors.AddSingleton(typeof(IJT808ConsumerOfT<byte[]>), (service)=> {
var loggerFactory = service.GetRequiredService<ILoggerFactory>();
var consumerConfig = service.GetRequiredService<IOptions<ConsumerConfig>>();
consumerConfig.Value.GroupId = "JT809.same.Test";
#warning JT809_Same_Consumer
//return new JT809_Same_Consumer(new JT809TopicOptions { TopicName = "jt809.same" }, consumerConfig, loggerFactory);
return null;
});
serviceDescriptors.AddSingleton(typeof(IJT808ConsumerOfT<JT809GpsPosition>), (service) => {
var loggerFactory = service.GetRequiredService<ILoggerFactory>();
var consumerConfig = service.GetRequiredService<IOptions<ConsumerConfig>>();
consumerConfig.Value.GroupId = "JT809.gps.Test";
#warning JT809_GpsPosition_Consumer
//return new JT809_GpsPosition_Consumer(new JT809TopicOptions { TopicName = "jt809.gps" }, consumerConfig, loggerFactory);
return null;
});
serviceDescriptors.Configure<JT809ConsumerConfig>(configuration.GetSection("JT809ConsumerConfig"));
serviceDescriptors.AddSingleton<JT809_GpsPosition_Consumer>();
return serviceDescriptors;
}

public static IServiceCollection AddJT809KafkaSameProducerService(this IServiceCollection serviceDescriptors, IConfiguration configuration)
{
serviceDescriptors.Configure<JT809SameProducerConfig>(configuration.GetSection("JT809SameProducerConfig"));
serviceDescriptors.AddSingleton<JT809_GpsPositio_Producer>();
return serviceDescriptors;
}


public static IServiceCollection AddJT809KafkaSameConsumerService(this IServiceCollection serviceDescriptors, IConfiguration configuration)
{
serviceDescriptors.Configure<JT809SameConsumerConfig>(configuration.GetSection("JT809SameConsumerConfig"));
serviceDescriptors.AddSingleton<JT809_Same_Consumer>();
return serviceDescriptors;
}

}
}

+ 0
- 67
src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Producer.cs Wyświetl plik

@@ -1,67 +0,0 @@
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using JT809.KafkaService.Configs;
using JT809.PubSub.Abstractions;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;

namespace JT809.KafkaService
{
/// <summary>
///
/// </summary>
/// <typeparam name="T"></typeparam>
public abstract class JT809Producer<T> : JT809ProducerBase<T>
{
private bool _disposed = false;

protected virtual IProducer<string, T> CreateProducer()
{
ProducerBuilder<string, T> producerBuilder = new ProducerBuilder<string, T>(ProducerConfig);
return producerBuilder.Build();
}

protected override IProducer<string, T> Producer { get; }

protected JT809Producer(
IOptions<JT809ProducerConfig> producerConfigAccessor)
: base(producerConfigAccessor.Value)
{
Producer = CreateProducer();
}

public override void Dispose()
{
Dispose(true);
GC.SuppressFinalize(true);

}

protected virtual void Dispose(bool disposing)
{
if (_disposed) return;
if (disposing)
{
Producer.Dispose();
}
_disposed = true;
}

public override async void ProduceAsync(string msgId, string vno_color, T data)
{
if (_disposed) return;
await Producer.ProduceAsync(ProducerConfig.TopicName, new Message<string, T>
{
Key = msgId,
Value = data
});
}

~JT809Producer()
{
Dispose(false);
}
}
}

+ 0
- 23
src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ProducerBase.cs Wyświetl plik

@@ -1,23 +0,0 @@
using Confluent.Kafka;
using JT809.KafkaService.Configs;
using JT809.PubSub.Abstractions;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT809.KafkaService
{
public abstract class JT809ProducerBase<T> : IJT809ProducerOfT<T>
{
protected JT809ProducerBase(IOptions<JT809ProducerConfig> config)
{
ProducerConfig = config.Value;
}

public JT809ProducerConfig ProducerConfig { get;}
protected abstract IProducer<string, T> Producer { get;}
public abstract void Dispose();
public abstract void ProduceAsync(string msgId, string vno_color, T data);
}
}

+ 46
- 2
src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Producer.cs Wyświetl plik

@@ -4,13 +4,57 @@ using JT809.GrpcProtos;
using JT809.KafkaService.Configs;
using JT809.PubSub.Abstractions;
using Microsoft.Extensions.Options;
using System;

namespace JT809.KafkaService
{
public sealed class JT809_GpsPositio_Producer : JT809Producer<JT809GpsPosition>
public sealed class JT809_GpsPositio_Producer : IJT809Producer
{
public JT809_GpsPositio_Producer(IOptions<JT809ProducerConfig> producerConfigAccessor) : base( producerConfigAccessor)
private readonly JT809ProducerConfig JT809ProducerConfig;

private IProducer<string, byte[]> Producer;

private bool _disposed = false;

public JT809_GpsPositio_Producer(IOptions<JT809ProducerConfig> producerConfigAccessor)
{
JT809ProducerConfig = producerConfigAccessor.Value;
ProducerBuilder<string, byte[]> producerBuilder = new ProducerBuilder<string, byte[]>(producerConfigAccessor.Value);
Producer = producerBuilder.Build();
TopicName = JT809ProducerConfig.TopicName;
}

public string TopicName { get; }

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(true);
}

void Dispose(bool disposing)
{
if (_disposed) return;
if (disposing)
{
Producer.Dispose();
}
_disposed = true;
}

public async void ProduceAsync(string msgId, string vno_color, byte[] data)
{
if (_disposed) return;
await Producer.ProduceAsync(JT809ProducerConfig.TopicName, new Message<string, byte[]>
{
Key = msgId,
Value = data
});
}

~JT809_GpsPositio_Producer()
{
Dispose(false);
}
}
}

+ 89
- 2
src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Consumer.cs Wyświetl plik

@@ -12,10 +12,97 @@ using System.Threading.Tasks;

namespace JT809.KafkaService
{
public sealed class JT809_GpsPosition_Consumer : JT809Consumer<JT809GpsPosition>
public sealed class JT809_GpsPosition_Consumer : IJT809Consumer
{
public JT809_GpsPosition_Consumer(IOptions<JT809ConsumerConfig> consumerConfigAccessor, ILoggerFactory loggerFactory) : base( consumerConfigAccessor, loggerFactory)
private bool _disposed = false;
public CancellationTokenSource Cts { get; } = new CancellationTokenSource();
private ILogger logger { get; }
public string TopicName { get; }

private IConsumer<string, byte[]> Consumer;
private readonly JT809ConsumerConfig JT809ConsumerConfig;

public JT809_GpsPosition_Consumer(
IOptions<JT809ConsumerConfig> consumerConfigAccessor,
ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger("JT809_GpsPosition_Consumer");
JT809ConsumerConfig = consumerConfigAccessor.Value;
TopicName = JT809ConsumerConfig.TopicName;
ConsumerBuilder<string, byte[]> consumerBuilder = new ConsumerBuilder<string, byte[]>(consumerConfigAccessor.Value);
consumerBuilder.SetErrorHandler((consumer, error) =>
{
logger.LogError(error.Reason);
});
Consumer = consumerBuilder.Build();
}

public void OnMessage(Action<(string MsgId, byte[] Data)> callback)
{
Task.Run(() =>
{
while (!Cts.IsCancellationRequested)
{
try
{
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据
//如果指定分区,根据kafka的机制会从相应的分区中拉取数据
//consumers[n].Assign(topicPartitionList[n]);
var data = Consumer.Consume(Cts.Token);
if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogDebug($"Topic: {data.Topic} Key: {data.Message.Key} Partition: {data.Partition} Offset: {data.Offset} Data:{string.Join("", data.Message.Value)} TopicPartitionOffset:{data.TopicPartitionOffset}");
}
callback((data.Message.Key, data.Message.Value));
}
catch (ConsumeException ex)
{
logger.LogError(ex, JT809ConsumerConfig.TopicName);
}
catch (Exception ex)
{
logger.LogError(ex, JT809ConsumerConfig.TopicName);
}
}
}, Cts.Token);
}

public void Subscribe()
{
if (_disposed) return;
//仅有一个分区才需要订阅
Consumer.Subscribe(JT809ConsumerConfig.TopicName);
}

public void Unsubscribe()
{
if (_disposed) return;
Consumer.Unsubscribe();
}

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(true);

}

~JT809_GpsPosition_Consumer()
{
Dispose(false);
}

void Dispose(bool disposing)
{
if (_disposed) return;
if (disposing)
{
Cts.Cancel();
Consumer.Close();
Consumer.Dispose();
Cts.Dispose();
}
_disposed = true;
}
}
}

+ 89
- 3
src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Consumer.cs Wyświetl plik

@@ -12,11 +12,97 @@ using System.Threading.Tasks;

namespace JT809.KafkaService
{
public sealed class JT809_Same_Consumer : JT809Consumer<byte[]>
public sealed class JT809_Same_Consumer : IJT809Consumer
{
public JT809_Same_Consumer(IOptions<JT809ConsumerConfig> consumerConfigAccessor, ILoggerFactory loggerFactory)
: base(consumerConfigAccessor, loggerFactory)
private bool _disposed = false;
public CancellationTokenSource Cts { get; } = new CancellationTokenSource();
private ILogger logger { get; }
public string TopicName { get; }

private IConsumer<string, byte[]> Consumer;
private readonly JT809SameConsumerConfig JT809ConsumerConfig;

public JT809_Same_Consumer(
IOptions<JT809SameConsumerConfig> consumerConfigAccessor,
ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger("JT809_Same_Consumer");
JT809ConsumerConfig = consumerConfigAccessor.Value;
TopicName = JT809ConsumerConfig.TopicName;
ConsumerBuilder<string, byte[]> consumerBuilder = new ConsumerBuilder<string, byte[]>(consumerConfigAccessor.Value);
consumerBuilder.SetErrorHandler((consumer, error) =>
{
logger.LogError(error.Reason);
});
Consumer = consumerBuilder.Build();
}

public void OnMessage(Action<(string MsgId, byte[] Data)> callback)
{
Task.Run(() =>
{
while (!Cts.IsCancellationRequested)
{
try
{
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据
//如果指定分区,根据kafka的机制会从相应的分区中拉取数据
//consumers[n].Assign(topicPartitionList[n]);
var data = Consumer.Consume(Cts.Token);
if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogDebug($"Topic: {data.Topic} Key: {data.Message.Key} Partition: {data.Partition} Offset: {data.Offset} Data:{string.Join("", data.Message.Value)} TopicPartitionOffset:{data.TopicPartitionOffset}");
}
callback((data.Message.Key, data.Message.Value));
}
catch (ConsumeException ex)
{
logger.LogError(ex, JT809ConsumerConfig.TopicName);
}
catch (Exception ex)
{
logger.LogError(ex, JT809ConsumerConfig.TopicName);
}
}
}, Cts.Token);
}

public void Subscribe()
{
if (_disposed) return;
//仅有一个分区才需要订阅
Consumer.Subscribe(JT809ConsumerConfig.TopicName);
}

public void Unsubscribe()
{
if (_disposed) return;
Consumer.Unsubscribe();
}

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(true);

}

~JT809_Same_Consumer()
{
Dispose(false);
}

void Dispose(bool disposing)
{
if (_disposed) return;
if (disposing)
{
Cts.Cancel();
Consumer.Close();
Consumer.Dispose();
Cts.Dispose();
}
_disposed = true;
}
}
}

+ 45
- 3
src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Producer.cs Wyświetl plik

@@ -8,11 +8,53 @@ using Microsoft.Extensions.Options;

namespace JT809.KafkaService
{
public sealed class JT809_Same_Producer : JT809Producer<byte[]>
public sealed class JT809_Same_Producer : IJT809Producer
{
public JT809_Same_Producer(IOptions<JT809ProducerConfig> producerConfigAccessor)
: base(producerConfigAccessor)
private readonly JT809SameProducerConfig JT809SameProducerConfig;

private IProducer<string, byte[]> Producer;

private bool _disposed = false;

public JT809_Same_Producer(IOptions<JT809SameProducerConfig> producerConfigAccessor)
{
JT809SameProducerConfig = producerConfigAccessor.Value;
ProducerBuilder<string, byte[]> producerBuilder = new ProducerBuilder<string,byte[]>(producerConfigAccessor.Value);
Producer= producerBuilder.Build();
TopicName = JT809SameProducerConfig.Value.TopicName;
}

public string TopicName { get; }

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(true);
}

void Dispose(bool disposing)
{
if (_disposed) return;
if (disposing)
{
Producer.Dispose();
}
_disposed = true;
}

public async void ProduceAsync(string msgId, string vno_color, byte[] data)
{
if (_disposed) return;
await Producer.ProduceAsync(JT809SameProducerConfig.TopicName, new Message<string, byte[]>
{
Key = msgId,
Value = data
});
}

~JT809_Same_Producer()
{
Dispose(false);
}
}
}

+ 6
- 3
src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTest.cs Wyświetl plik

@@ -4,6 +4,8 @@ using Microsoft.Extensions.DependencyInjection;
using JT809.Protocol.Enums;
using JT809.Protocol.Extensions;
using System.Threading;
using Google.Protobuf;
using JT809.GrpcProtos;

namespace JT809.KafkaServiceTest
{
@@ -16,9 +18,10 @@ namespace JT809.KafkaServiceTest
consumerTestService.GpsConsumer.OnMessage((Message)=>
{
Assert.Equal(JT809SubBusinessType.实时上传车辆定位信息.ToValueString(), Message.MsgId);
Assert.Equal("粤A23456", Message.Data.Vno);
Assert.Equal(2, Message.Data.VColor);
Assert.Equal("smallchi", Message.Data.FromChannel);
JT809GpsPosition jT809GpsPosition = JT809GpsPosition.Parser.ParseFrom(Message.Data);
Assert.Equal("粤A23456", jT809GpsPosition.Vno);
Assert.Equal(2, jT809GpsPosition.VColor);
Assert.Equal("smallchi", jT809GpsPosition.FromChannel);
});
consumerTestService.GpsConsumer.Subscribe();



+ 3
- 3
src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTestService.cs Wyświetl plik

@@ -9,9 +9,9 @@ namespace JT809.KafkaServiceTest
{
public class ConsumerTestService
{
public IJT808ConsumerOfT<byte[]> SameConsumer { get; }
public IJT808ConsumerOfT<JT809GpsPosition> GpsConsumer { get; }
public ConsumerTestService(IJT808ConsumerOfT<byte[]> sameConsumer, IJT808ConsumerOfT<JT809GpsPosition> gpsConsumer)
public JT809_Same_Consumer SameConsumer { get; }
public JT809_GpsPosition_Consumer GpsConsumer { get; }
public ConsumerTestService(JT809_Same_Consumer sameConsumer, JT809_GpsPosition_Consumer gpsConsumer)
{
SameConsumer = sameConsumer;
GpsConsumer = gpsConsumer;


+ 11
- 8
src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTest.cs Wyświetl plik

@@ -3,6 +3,7 @@ using Xunit;
using Microsoft.Extensions.DependencyInjection;
using JT809.Protocol.Enums;
using JT809.Protocol.Extensions;
using Google.Protobuf;

namespace JT809.KafkaServiceTest
{
@@ -12,19 +13,21 @@ namespace JT809.KafkaServiceTest
public void Test1()
{
ProducerTestService producerTestService = ServiceProvider.GetRequiredService<ProducerTestService>();
producerTestService.GpsProducer.ProduceAsync(JT809SubBusinessType.实时上传车辆定位信息.ToValueString(), "粤A23456_2", new GrpcProtos.JT809GpsPosition
{
Vno= "粤A23456",
VColor=2,
GpsTime= (DateTime.Now.ToUniversalTime().Ticks - 621355968000000000) / 10000000,
FromChannel="smallchi"
});

producerTestService.GpsProducer.ProduceAsync(JT809SubBusinessType.实时上传车辆定位信息.ToValueString(), "粤A23456_2",
new GrpcProtos.JT809GpsPosition
{
Vno = "粤A23456",
VColor = 2,
GpsTime = (DateTime.Now.ToUniversalTime().Ticks - 621355968000000000) / 10000000,
FromChannel = "smallchi"
}.ToByteArray());
}

[Fact]
public void Test2()
{
ProducerTestService producerTestService = ServiceProvider.GetRequiredService<ProducerTestService>();
ProducerTestService producerTestService = ServiceProvider.GetRequiredService<ProducerTestService>();
producerTestService.SameProducer.ProduceAsync(JT809SubBusinessType.None.ToValueString(), "粤A23457_2", new byte[] { 0x01, 0x02, 0x03 });
}
}


+ 3
- 3
src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTestService.cs Wyświetl plik

@@ -9,9 +9,9 @@ namespace JT809.KafkaServiceTest
{
public class ProducerTestService
{
public IJT809ProducerOfT<byte[]> SameProducer { get; }
public IJT809ProducerOfT<JT809GpsPosition> GpsProducer { get; }
public ProducerTestService(IJT809ProducerOfT<byte[]> sameProducer, IJT809ProducerOfT<JT809GpsPosition> gpsProducer)
public JT809_Same_Producer SameProducer { get; }
public JT809_GpsPositio_Producer GpsProducer { get; }
public ProducerTestService(JT809_Same_Producer sameProducer, JT809_GpsPositio_Producer gpsProducer)
{
SameProducer = sameProducer;
GpsProducer = gpsProducer;


+ 2
- 6
src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809Consumer.cs Wyświetl plik

@@ -5,13 +5,9 @@ using System.Threading;

namespace JT809.PubSub.Abstractions
{
public interface IJT809Consumer : IJT809PubSub, IJT808ConsumerOfT<byte[]>
public interface IJT809Consumer : IJT809PubSub
{

}
public interface IJT808ConsumerOfT<T> :IDisposable
{
void OnMessage(Action<(string MsgId, T Data)> callback);
void OnMessage(Action<(string MsgId, byte[] Data)> callback);
CancellationTokenSource Cts { get; }
void Subscribe();
void Unsubscribe();


+ 2
- 7
src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809Producer.cs Wyświetl plik

@@ -3,12 +3,7 @@ using System.Threading.Tasks;

namespace JT809.PubSub.Abstractions
{
public interface IJT809Producer: IJT809PubSub, IJT809ProducerOfT<byte[]>
{
}

public interface IJT809ProducerOfT<T>: IDisposable
public interface IJT809Producer: IJT809PubSub, IDisposable
{
/// <summary>
///
@@ -16,6 +11,6 @@ namespace JT809.PubSub.Abstractions
/// <param name="msgId">消息Id</param>
/// <param name="vno_color">车牌号+车牌颜色</param>
/// <param name="data">hex data</param>
void ProduceAsync(string msgId, string vno_color, T data);
void ProduceAsync(string msgId, string vno_color, byte[] data);
}
}

+ 0
- 17
src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809ProducerPartitionFactory.cs Wyświetl plik

@@ -1,17 +0,0 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT809.PubSub.Abstractions
{
/// <summary>
/// jt809生产者分区工厂
/// 分区策略:
/// 1.可以根据(车牌号+颜色)进行分区
/// 2.可以根据msgId(消息Id)+(车牌号+颜色)进行分区
/// </summary>
public interface IJT809ProducerPartitionFactory
{
int CreatePartition(string topicName, string msgId, string vno_color);
}
}

+ 0
- 35
src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809HashAlgorithm.cs Wyświetl plik

@@ -1,35 +0,0 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Security.Cryptography;

namespace JT809.PubSub.Abstractions
{
public class JT809HashAlgorithm
{
/// <summary>
/// 使用Ketama
/// </summary>
/// <param name="digest"></param>
/// <param name="nTime"></param>
/// <returns></returns>
public static long Hash(byte[] digest, int nTime=1)
{
long rv = ((long)(digest[3 + nTime * 4] & 0xFF) << 24)
| ((long)(digest[2 + nTime * 4] & 0xFF) << 16)
| ((long)(digest[1 + nTime * 4] & 0xFF) << 8)
| ((long)digest[0 + nTime * 4] & 0xFF);
return rv & 0xffffffffL;
}

public static byte[] ComputeMd5(string key)
{
using (MD5 md5 = new MD5CryptoServiceProvider())
{
byte[] keyBytes = md5.ComputeHash(Encoding.UTF8.GetBytes(key));
md5.Clear();
return keyBytes;
}
}
}
}

+ 0
- 16
src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809PartitionOptions.cs Wyświetl plik

@@ -1,16 +0,0 @@
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT809.PubSub.Abstractions
{
public class JT809PartitionOptions:IOptions<JT809PartitionOptions>
{
public int Partition { get; set; } = 1;

public JT809PartitionOptions Value => this;

public List<int> AssignPartitions { get; set; }
}
}

+ 0
- 14
src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809TopicOptions.cs Wyświetl plik

@@ -1,14 +0,0 @@
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT809.PubSub.Abstractions
{
public class JT809TopicOptions:IOptions<JT809TopicOptions>
{
public string TopicName { get; set; } = JT809Constants.JT809TopicName;

public JT809TopicOptions Value => this;
}
}

+ 6
- 4
src/JT809.DotNetty.Simples/Superior/JT809.Superior.Server/JT809SuperiorMsgIdReceiveHandler.cs Wyświetl plik

@@ -1,7 +1,9 @@
using JT809.DotNetty.Core.Handlers;
using Google.Protobuf;
using JT809.DotNetty.Core.Handlers;
using JT809.DotNetty.Core.Interfaces;
using JT809.DotNetty.Core.Metadata;
using JT809.GrpcProtos;
using JT809.KafkaService;
using JT809.Protocol;
using JT809.Protocol.SubMessageBody;
using JT809.PubSub.Abstractions;
@@ -16,11 +18,11 @@ namespace JT809.Superior.Server
{
public sealed class JT809SuperiorMsgIdReceiveHandler : JT809SuperiorMsgIdReceiveHandlerBase
{
private readonly IJT809ProducerOfT<JT809GpsPosition> producer;
private readonly JT809_GpsPositio_Producer producer;
private readonly JT809GpsOptions gpsOptions;
public JT809SuperiorMsgIdReceiveHandler(
IOptions<JT809GpsOptions>jt809GpsAccessor,
IJT809ProducerOfT<JT809GpsPosition> producer,
JT809_GpsPositio_Producer producer,
ILoggerFactory loggerFactory,
IJT809SubordinateLoginService jT809SubordinateLoginService,
IJT809VerifyCodeGenerator verifyCodeGenerator)
@@ -55,7 +57,7 @@ namespace JT809.Superior.Server
gpsBodies.VehiclePosition.Minute,
gpsBodies.VehiclePosition.Second).ToUniversalTime().Ticks - 621355968000000000) / 10000000;
gpsPosition.FromChannel = gpsOptions.FromChannel;
producer.ProduceAsync($"{0x1202}", $"{exchangeMessageBodies.VehicleNo}{exchangeMessageBodies.VehicleColor}", gpsPosition);
producer.ProduceAsync($"{0x1202}", $"{exchangeMessageBodies.VehicleNo}{exchangeMessageBodies.VehicleColor}", gpsPosition.ToByteArray());
return base.Msg0x1200_0x1202(request);
}
}


+ 1
- 2
src/JT809.DotNetty.Simples/Superior/JT809.Superior.Server/Program.cs Wyświetl plik

@@ -40,8 +40,7 @@ namespace JT809.Superior.Server
options.TcpPort = 808;
});
services.Configure<JT809GpsOptions>(hostContext.Configuration.GetSection("JT809GpsOptions"));
#warning AddJT809KafkaProducerPartitionsService
//services.AddJT809KafkaProducerPartitionsService(hostContext.Configuration,options=> options.Partition=10);
services.AddJT809KafkaProducerService(hostContext.Configuration);
services.Replace(new ServiceDescriptor(typeof(JT809SuperiorMsgIdReceiveHandlerBase), typeof(JT809SuperiorMsgIdReceiveHandler), ServiceLifetime.Singleton));
});



+ 7
- 9
src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/Program.cs Wyświetl plik

@@ -1,5 +1,6 @@
using JT809.DotNetty.Core;
using JT809.DotNetty.Core.Configurations;
using JT809.Protocol;
using JT809.Protocol.Configs;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
@@ -14,15 +15,6 @@ namespace JT809.DotNetty.Host.Test
{
static async Task Main(string[] args)
{
#warning JT809.Protocol.JT809GlobalConfig.Instance
//JT809.Protocol.JT809GlobalConfig.Instance
// .SetHeaderOptions(new JT809HeaderOptions
// {
// MsgGNSSCENTERID = 20141013,
// Version = new JT809.Protocol.JT809Header_Version(1, 0, 0),
// EncryptKey = 9595
// });

//主链路登录请求消息
//5B00000048000000851001013353D5010000000000270F0133530D32303134303831333132372E302E302E3100000000000000000000000000000000000000000000001FA3275F5D
//主链路注销请求消息
@@ -45,6 +37,12 @@ namespace JT809.DotNetty.Host.Test
{
services.AddSingleton<ILoggerFactory, LoggerFactory>();
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
services.AddSingleton(new JT809HeaderOptions
{
MsgGNSSCENTERID = 20141013,
Version = new JT809Header_Version(1, 0, 0),
EncryptKey = 9595
});
services.AddJT809Core(hostContext.Configuration)
.AddJT809SuperiorPlatform(options:options=> {
options.TcpPort = 839;


Ładowanie…
Anuluj
Zapisz