Quellcode durchsuchen

1.增加实时定位数据结构解析

2.增加分区策略、包装生产消费模式
3.修改文档
tags/old
SmallChi vor 6 Jahren
Ursprung
Commit
586b91f518
19 geänderte Dateien mit 1138 neuen und 3 gelöschten Zeilen
  1. +1
    -1
      README.md
  2. +10
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.GrpcProtos/JT809.GrpcProtos.csproj
  3. +586
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.GrpcProtos/JT809GpsPosition.cs
  4. +35
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.GrpcProtos/Protos/JT809GpsPosition.proto
  5. +1
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.GrpcProtos/Protos/generate.txt
  6. +18
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809.KafkaService.csproj
  7. +138
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Producer.cs
  8. +41
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Producer.cs
  9. +120
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Consumer.cs
  10. +25
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/Partitions/JT809GpsPositionProducerPartitionFactoryImpl.cs
  11. +19
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809Consumer.cs
  12. +21
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809Producer.cs
  13. +17
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809ProducerPartitionFactory.cs
  14. +7
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809PubSub.cs
  15. +11
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809.PubSub.Abstractions.csproj
  16. +11
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809Constants.cs
  17. +35
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809HashAlgorithm.cs
  18. +14
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809PartitionOptions.cs
  19. +28
    -2
      src/JT809.DotNetty.sln

+ 1
- 1
README.md Datei anzeigen

@@ -1,4 +1,4 @@
# JT809Netty
# JT809DotNetty

由于脑子不够用,对于双链路的理解不是很到位,先解决目前需要对接的车辆数据。



+ 10
- 0
src/JT809.DotNetty.Simples/Superior/JT809.GrpcProtos/JT809.GrpcProtos.csproj Datei anzeigen

@@ -0,0 +1,10 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="3.7.0" />
</ItemGroup>

</Project>

+ 586
- 0
src/JT809.DotNetty.Simples/Superior/JT809.GrpcProtos/JT809GpsPosition.cs Datei anzeigen

@@ -0,0 +1,586 @@
// <auto-generated>
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: Protos/JT809GpsPosition.proto
// </auto-generated>
#pragma warning disable 1591, 0612, 3021
#region Designer generated code

using pb = global::Google.Protobuf;
using pbc = global::Google.Protobuf.Collections;
using pbr = global::Google.Protobuf.Reflection;
using scg = global::System.Collections.Generic;
namespace JT809.GrpcProtos {

/// <summary>Holder for reflection information generated from Protos/JT809GpsPosition.proto</summary>
public static partial class JT809GpsPositionReflection {

#region Descriptor
/// <summary>File descriptor for Protos/JT809GpsPosition.proto</summary>
public static pbr::FileDescriptor Descriptor {
get { return descriptor; }
}
private static pbr::FileDescriptor descriptor;

static JT809GpsPositionReflection() {
byte[] descriptorData = global::System.Convert.FromBase64String(
string.Concat(
"Ch1Qcm90b3MvSlQ4MDlHcHNQb3NpdGlvbi5wcm90bxIQSlQ4MDkuR3JwY1By",
"b3RvcyLtAQoQSlQ4MDlHcHNQb3NpdGlvbhILCgNWbm8YASABKAkSDgoGVkNv",
"bG9yGAIgASgFEhMKC0Zyb21DaGFubmVsGAMgASgJEg8KB0VuY3J5cHQYBCAB",
"KAUSDwoHR3BzVGltZRgFIAEoAxILCgNMb24YBiABKAMSCwoDTGF0GAcgASgD",
"EgwKBFZlYzEYDSABKAUSDAoEVmVjMhgOIAEoBRIMCgRWZWMzGA8gASgFEhEK",
"CURpcmVjdGlvbhgQIAEoBRIQCghBbHRpdHVkZRgRIAEoBRINCgVTdGF0ZRgS",
"IAEoBRINCgVBbGFybRgTIAEoBWIGcHJvdG8z"));
descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData,
new pbr::FileDescriptor[] { },
new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] {
new pbr::GeneratedClrTypeInfo(typeof(global::JT809.GrpcProtos.JT809GpsPosition), global::JT809.GrpcProtos.JT809GpsPosition.Parser, new[]{ "Vno", "VColor", "FromChannel", "Encrypt", "GpsTime", "Lon", "Lat", "Vec1", "Vec2", "Vec3", "Direction", "Altitude", "State", "Alarm" }, null, null, null)
}));
}
#endregion

}
#region Messages
/// <summary>
/// 809车辆定位信息
/// </summary>
public sealed partial class JT809GpsPosition : pb::IMessage<JT809GpsPosition> {
private static readonly pb::MessageParser<JT809GpsPosition> _parser = new pb::MessageParser<JT809GpsPosition>(() => new JT809GpsPosition());
private pb::UnknownFieldSet _unknownFields;
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public static pb::MessageParser<JT809GpsPosition> Parser { get { return _parser; } }

[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public static pbr::MessageDescriptor Descriptor {
get { return global::JT809.GrpcProtos.JT809GpsPositionReflection.Descriptor.MessageTypes[0]; }
}

[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
pbr::MessageDescriptor pb::IMessage.Descriptor {
get { return Descriptor; }
}

[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public JT809GpsPosition() {
OnConstruction();
}

partial void OnConstruction();

[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public JT809GpsPosition(JT809GpsPosition other) : this() {
vno_ = other.vno_;
vColor_ = other.vColor_;
fromChannel_ = other.fromChannel_;
encrypt_ = other.encrypt_;
gpsTime_ = other.gpsTime_;
lon_ = other.lon_;
lat_ = other.lat_;
vec1_ = other.vec1_;
vec2_ = other.vec2_;
vec3_ = other.vec3_;
direction_ = other.direction_;
altitude_ = other.altitude_;
state_ = other.state_;
alarm_ = other.alarm_;
_unknownFields = pb::UnknownFieldSet.Clone(other._unknownFields);
}

[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public JT809GpsPosition Clone() {
return new JT809GpsPosition(this);
}

/// <summary>Field number for the "Vno" field.</summary>
public const int VnoFieldNumber = 1;
private string vno_ = "";
/// <summary>
/// 车牌号
/// </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public string Vno {
get { return vno_; }
set {
vno_ = pb::ProtoPreconditions.CheckNotNull(value, "value");
}
}

/// <summary>Field number for the "VColor" field.</summary>
public const int VColorFieldNumber = 2;
private int vColor_;
/// <summary>
/// 车牌颜色
/// </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public int VColor {
get { return vColor_; }
set {
vColor_ = value;
}
}

/// <summary>Field number for the "FromChannel" field.</summary>
public const int FromChannelFieldNumber = 3;
private string fromChannel_ = "";
/// <summary>
/// 来源
/// </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public string FromChannel {
get { return fromChannel_; }
set {
fromChannel_ = pb::ProtoPreconditions.CheckNotNull(value, "value");
}
}

/// <summary>Field number for the "Encrypt" field.</summary>
public const int EncryptFieldNumber = 4;
private int encrypt_;
/// <summary>
/// 该字段标识传输的定位信息是否使用国家测绘局批准的地图保密插件进行加密。
/// </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public int Encrypt {
get { return encrypt_; }
set {
encrypt_ = value;
}
}

/// <summary>Field number for the "GpsTime" field.</summary>
public const int GpsTimeFieldNumber = 5;
private long gpsTime_;
/// <summary>
/// 定位时间
/// </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public long GpsTime {
get { return gpsTime_; }
set {
gpsTime_ = value;
}
}

/// <summary>Field number for the "Lon" field.</summary>
public const int LonFieldNumber = 6;
private long lon_;
/// <summary>
/// 经度,单位为 1*10^-6 度。
/// </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public long Lon {
get { return lon_; }
set {
lon_ = value;
}
}

/// <summary>Field number for the "Lat" field.</summary>
public const int LatFieldNumber = 7;
private long lat_;
/// <summary>
/// 纬度,单位为 1*10^-6 度。
/// </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public long Lat {
get { return lat_; }
set {
lat_ = value;
}
}

/// <summary>Field number for the "Vec1" field.</summary>
public const int Vec1FieldNumber = 13;
private int vec1_;
/// <summary>
/// 速度,指卫星定位车载终端设备上传的行车速度信息,为必填项。单位为千米每小时(km/h)。
/// </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public int Vec1 {
get { return vec1_; }
set {
vec1_ = value;
}
}

/// <summary>Field number for the "Vec2" field.</summary>
public const int Vec2FieldNumber = 14;
private int vec2_;
/// <summary>
/// 行驶记录速度,指车辆行驶记录设备上传的行车速度 信息,为必填项。单位为千米每小时(km/h)。
/// </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public int Vec2 {
get { return vec2_; }
set {
vec2_ = value;
}
}

/// <summary>Field number for the "Vec3" field.</summary>
public const int Vec3FieldNumber = 15;
private int vec3_;
/// <summary>
/// 车辆当前总里程数,值车辆上传的行车里程数。单位单位为千米(km)
/// </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public int Vec3 {
get { return vec3_; }
set {
vec3_ = value;
}
}

/// <summary>Field number for the "Direction" field.</summary>
public const int DirectionFieldNumber = 16;
private int direction_;
/// <summary>
/// 方向,0-359,单位为度(。),正北为 0,顺时针。
/// </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public int Direction {
get { return direction_; }
set {
direction_ = value;
}
}

/// <summary>Field number for the "Altitude" field.</summary>
public const int AltitudeFieldNumber = 17;
private int altitude_;
/// <summary>
/// 海拔高度,单位为米(m)。
/// </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public int Altitude {
get { return altitude_; }
set {
altitude_ = value;
}
}

/// <summary>Field number for the "State" field.</summary>
public const int StateFieldNumber = 18;
private int state_;
/// <summary>
/// 车辆状态
/// </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public int State {
get { return state_; }
set {
state_ = value;
}
}

/// <summary>Field number for the "Alarm" field.</summary>
public const int AlarmFieldNumber = 19;
private int alarm_;
/// <summary>
/// 报警状态
/// </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public int Alarm {
get { return alarm_; }
set {
alarm_ = value;
}
}

[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public override bool Equals(object other) {
return Equals(other as JT809GpsPosition);
}

[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public bool Equals(JT809GpsPosition other) {
if (ReferenceEquals(other, null)) {
return false;
}
if (ReferenceEquals(other, this)) {
return true;
}
if (Vno != other.Vno) return false;
if (VColor != other.VColor) return false;
if (FromChannel != other.FromChannel) return false;
if (Encrypt != other.Encrypt) return false;
if (GpsTime != other.GpsTime) return false;
if (Lon != other.Lon) return false;
if (Lat != other.Lat) return false;
if (Vec1 != other.Vec1) return false;
if (Vec2 != other.Vec2) return false;
if (Vec3 != other.Vec3) return false;
if (Direction != other.Direction) return false;
if (Altitude != other.Altitude) return false;
if (State != other.State) return false;
if (Alarm != other.Alarm) return false;
return Equals(_unknownFields, other._unknownFields);
}

[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public override int GetHashCode() {
int hash = 1;
if (Vno.Length != 0) hash ^= Vno.GetHashCode();
if (VColor != 0) hash ^= VColor.GetHashCode();
if (FromChannel.Length != 0) hash ^= FromChannel.GetHashCode();
if (Encrypt != 0) hash ^= Encrypt.GetHashCode();
if (GpsTime != 0L) hash ^= GpsTime.GetHashCode();
if (Lon != 0L) hash ^= Lon.GetHashCode();
if (Lat != 0L) hash ^= Lat.GetHashCode();
if (Vec1 != 0) hash ^= Vec1.GetHashCode();
if (Vec2 != 0) hash ^= Vec2.GetHashCode();
if (Vec3 != 0) hash ^= Vec3.GetHashCode();
if (Direction != 0) hash ^= Direction.GetHashCode();
if (Altitude != 0) hash ^= Altitude.GetHashCode();
if (State != 0) hash ^= State.GetHashCode();
if (Alarm != 0) hash ^= Alarm.GetHashCode();
if (_unknownFields != null) {
hash ^= _unknownFields.GetHashCode();
}
return hash;
}

[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public override string ToString() {
return pb::JsonFormatter.ToDiagnosticString(this);
}

[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public void WriteTo(pb::CodedOutputStream output) {
if (Vno.Length != 0) {
output.WriteRawTag(10);
output.WriteString(Vno);
}
if (VColor != 0) {
output.WriteRawTag(16);
output.WriteInt32(VColor);
}
if (FromChannel.Length != 0) {
output.WriteRawTag(26);
output.WriteString(FromChannel);
}
if (Encrypt != 0) {
output.WriteRawTag(32);
output.WriteInt32(Encrypt);
}
if (GpsTime != 0L) {
output.WriteRawTag(40);
output.WriteInt64(GpsTime);
}
if (Lon != 0L) {
output.WriteRawTag(48);
output.WriteInt64(Lon);
}
if (Lat != 0L) {
output.WriteRawTag(56);
output.WriteInt64(Lat);
}
if (Vec1 != 0) {
output.WriteRawTag(104);
output.WriteInt32(Vec1);
}
if (Vec2 != 0) {
output.WriteRawTag(112);
output.WriteInt32(Vec2);
}
if (Vec3 != 0) {
output.WriteRawTag(120);
output.WriteInt32(Vec3);
}
if (Direction != 0) {
output.WriteRawTag(128, 1);
output.WriteInt32(Direction);
}
if (Altitude != 0) {
output.WriteRawTag(136, 1);
output.WriteInt32(Altitude);
}
if (State != 0) {
output.WriteRawTag(144, 1);
output.WriteInt32(State);
}
if (Alarm != 0) {
output.WriteRawTag(152, 1);
output.WriteInt32(Alarm);
}
if (_unknownFields != null) {
_unknownFields.WriteTo(output);
}
}

[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public int CalculateSize() {
int size = 0;
if (Vno.Length != 0) {
size += 1 + pb::CodedOutputStream.ComputeStringSize(Vno);
}
if (VColor != 0) {
size += 1 + pb::CodedOutputStream.ComputeInt32Size(VColor);
}
if (FromChannel.Length != 0) {
size += 1 + pb::CodedOutputStream.ComputeStringSize(FromChannel);
}
if (Encrypt != 0) {
size += 1 + pb::CodedOutputStream.ComputeInt32Size(Encrypt);
}
if (GpsTime != 0L) {
size += 1 + pb::CodedOutputStream.ComputeInt64Size(GpsTime);
}
if (Lon != 0L) {
size += 1 + pb::CodedOutputStream.ComputeInt64Size(Lon);
}
if (Lat != 0L) {
size += 1 + pb::CodedOutputStream.ComputeInt64Size(Lat);
}
if (Vec1 != 0) {
size += 1 + pb::CodedOutputStream.ComputeInt32Size(Vec1);
}
if (Vec2 != 0) {
size += 1 + pb::CodedOutputStream.ComputeInt32Size(Vec2);
}
if (Vec3 != 0) {
size += 1 + pb::CodedOutputStream.ComputeInt32Size(Vec3);
}
if (Direction != 0) {
size += 2 + pb::CodedOutputStream.ComputeInt32Size(Direction);
}
if (Altitude != 0) {
size += 2 + pb::CodedOutputStream.ComputeInt32Size(Altitude);
}
if (State != 0) {
size += 2 + pb::CodedOutputStream.ComputeInt32Size(State);
}
if (Alarm != 0) {
size += 2 + pb::CodedOutputStream.ComputeInt32Size(Alarm);
}
if (_unknownFields != null) {
size += _unknownFields.CalculateSize();
}
return size;
}

[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public void MergeFrom(JT809GpsPosition other) {
if (other == null) {
return;
}
if (other.Vno.Length != 0) {
Vno = other.Vno;
}
if (other.VColor != 0) {
VColor = other.VColor;
}
if (other.FromChannel.Length != 0) {
FromChannel = other.FromChannel;
}
if (other.Encrypt != 0) {
Encrypt = other.Encrypt;
}
if (other.GpsTime != 0L) {
GpsTime = other.GpsTime;
}
if (other.Lon != 0L) {
Lon = other.Lon;
}
if (other.Lat != 0L) {
Lat = other.Lat;
}
if (other.Vec1 != 0) {
Vec1 = other.Vec1;
}
if (other.Vec2 != 0) {
Vec2 = other.Vec2;
}
if (other.Vec3 != 0) {
Vec3 = other.Vec3;
}
if (other.Direction != 0) {
Direction = other.Direction;
}
if (other.Altitude != 0) {
Altitude = other.Altitude;
}
if (other.State != 0) {
State = other.State;
}
if (other.Alarm != 0) {
Alarm = other.Alarm;
}
_unknownFields = pb::UnknownFieldSet.MergeFrom(_unknownFields, other._unknownFields);
}

[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public void MergeFrom(pb::CodedInputStream input) {
uint tag;
while ((tag = input.ReadTag()) != 0) {
switch(tag) {
default:
_unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, input);
break;
case 10: {
Vno = input.ReadString();
break;
}
case 16: {
VColor = input.ReadInt32();
break;
}
case 26: {
FromChannel = input.ReadString();
break;
}
case 32: {
Encrypt = input.ReadInt32();
break;
}
case 40: {
GpsTime = input.ReadInt64();
break;
}
case 48: {
Lon = input.ReadInt64();
break;
}
case 56: {
Lat = input.ReadInt64();
break;
}
case 104: {
Vec1 = input.ReadInt32();
break;
}
case 112: {
Vec2 = input.ReadInt32();
break;
}
case 120: {
Vec3 = input.ReadInt32();
break;
}
case 128: {
Direction = input.ReadInt32();
break;
}
case 136: {
Altitude = input.ReadInt32();
break;
}
case 144: {
State = input.ReadInt32();
break;
}
case 152: {
Alarm = input.ReadInt32();
break;
}
}
}
}

}

#endregion

}

#endregion Designer generated code

+ 35
- 0
src/JT809.DotNetty.Simples/Superior/JT809.GrpcProtos/Protos/JT809GpsPosition.proto Datei anzeigen

@@ -0,0 +1,35 @@
syntax = "proto3";

package JT809.GrpcProtos;

// 809车辆定位信息
message JT809GpsPosition {
// 车牌号
string Vno = 1;
// 车牌颜色
int32 VColor = 2;
// 来源
string FromChannel = 3;
// 该字段标识传输的定位信息是否使用国家测绘局批准的地图保密插件进行加密。
int32 Encrypt = 4;
// 定位时间
int64 GpsTime = 5;
// 经度,单位为 1*10^-6 度。
int64 Lon = 6;
// 纬度,单位为 1*10^-6 度。
int64 Lat = 7;
// 速度,指卫星定位车载终端设备上传的行车速度信息,为必填项。单位为千米每小时(km/h)。
int32 Vec1 = 13;
// 行驶记录速度,指车辆行驶记录设备上传的行车速度 信息,为必填项。单位为千米每小时(km/h)。
int32 Vec2 = 14;
// 车辆当前总里程数,值车辆上传的行车里程数。单位单位为千米(km)
int32 Vec3 = 15;
// 方向,0-359,单位为度(。),正北为 0,顺时针。
int32 Direction = 16;
// 海拔高度,单位为米(m)。
int32 Altitude = 17;
// 车辆状态
int32 State = 18;
// 报警状态
int32 Alarm = 19;
}

+ 1
- 0
src/JT809.DotNetty.Simples/Superior/JT809.GrpcProtos/Protos/generate.txt Datei anzeigen

@@ -0,0 +1 @@
C:\Users\Administrator\.nuget\packages\grpc.tools\1.14.2\tools\windows_x64\protoc.exe -I JT809.GrpcProtos --csharp_out JT809.GrpcProtos JT809.GrpcProtos\Protos\JT809GpsPosition.proto --grpc_out JT809.GrpcProtos --plugin=protoc-gen-grpc=C:\Users\Administrator\.nuget\packages\grpc.tools\1.14.2\tools\windows_x64\grpc_csharp_plugin.exe;

+ 18
- 0
src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809.KafkaService.csproj Datei anzeigen

@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.0.0-RC4" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.2.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\JT809.GrpcProtos\JT809.GrpcProtos.csproj" />
<ProjectReference Include="..\JT809.PubSub.Abstractions\JT809.PubSub.Abstractions.csproj" />
</ItemGroup>

</Project>

+ 138
- 0
src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Producer.cs Datei anzeigen

@@ -0,0 +1,138 @@
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using JT809.PubSub.Abstractions;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;

namespace JT809.KafkaService
{
public abstract class JT809Producer<T> : IJT809ProducerOfT<T>
{
private bool _disposed = false;

public virtual string TopicName => JT809Constants.JT809TopicName;

private ConcurrentDictionary<string, TopicPartition> TopicPartitionCache;

private IProducer<string, T> producer;

protected virtual IJT809ProducerPartitionFactory ProducerPartitionFactory { get; }

protected virtual JT809PartitionOptions PartitionOptions { get; }

protected abstract ProducerConfig ProducerConfig { get; }

protected JT809Producer()
{
CreateProducer();
if (PartitionOptions != null)
{
TopicPartitionCache = new ConcurrentDictionary<string, TopicPartition>();
if (PartitionOptions.Partition > 1)
{
using (var adminClient = new AdminClient(producer.Handle))
{
try
{
adminClient.CreateTopicsAsync(new TopicSpecification[] { new TopicSpecification { Name = TopicName, NumPartitions = 1, ReplicationFactor = 1 } }).Wait();
}
catch (AggregateException ex)
{
//{Confluent.Kafka.Admin.CreateTopicsException: An error occurred creating topics: [jt809]: [Topic 'jt809' already exists.].}
if (ex.InnerException is Confluent.Kafka.Admin.CreateTopicsException exception)
{

}
else
{
//记录日志
//throw ex.InnerException;
}
}
try
{
//topic IncreaseTo 只增不减
adminClient.CreatePartitionsAsync(
new List<PartitionsSpecification>
{
new PartitionsSpecification
{
IncreaseTo = PartitionOptions.Partition,
Topic=TopicName
}
}
).Wait();
}
catch (AggregateException ex)
{
//记录日志
// throw ex.InnerException;
}
}
}
}
}

protected abstract IProducer<string, T> CreateProducer();

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(true);

}

protected virtual void Dispose(bool disposing)
{
if (_disposed) return;
if (disposing)
{
producer.Dispose();
}
_disposed = true;
}

public void ProduceAsync(string msgId, string vno_color, T data)
{
if (_disposed) return;
if (PartitionOptions != null)
{
if (PartitionOptions.Partition > 1)
{
if (!TopicPartitionCache.TryGetValue(vno_color, out TopicPartition topicPartition))
{
topicPartition = new TopicPartition(TopicName, new Partition(ProducerPartitionFactory.CreatePartition(TopicName, msgId, vno_color)));
TopicPartitionCache.TryAdd(vno_color, topicPartition);
}
producer.ProduceAsync(topicPartition, new Message<string, T>
{
Key = msgId,
Value = data
});
}
else
{
producer.ProduceAsync(TopicName, new Message<string, T>
{
Key = msgId,
Value = data
});
}
}
else
{
producer.ProduceAsync(TopicName, new Message<string, T>
{
Key = msgId,
Value = data
});
}
}

~JT809Producer()
{
Dispose(false);
}
}
}

+ 41
- 0
src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Producer.cs Datei anzeigen

@@ -0,0 +1,41 @@
using Confluent.Kafka;
using Google.Protobuf;
using JT809.GrpcProtos;
using JT809.PubSub.Abstractions;
using Microsoft.Extensions.Options;

namespace JT809.KafkaService
{
public sealed class JT809_GpsPositio_Producer : JT809Producer<JT809GpsPosition>
{
protected override IJT809ProducerPartitionFactory ProducerPartitionFactory { get; }

protected override ProducerConfig ProducerConfig { get; }

protected override JT809PartitionOptions PartitionOptions { get; }

public JT809_GpsPositio_Producer(IOptions<ProducerConfig> producerConfigAccessor)
: this(producerConfigAccessor,null, null )
{

}

public JT809_GpsPositio_Producer(
IOptions<ProducerConfig> producerConfigAccessor,
IJT809ProducerPartitionFactory partitionFactory,
IOptions<JT809PartitionOptions> partitionAccessor
)
{
ProducerPartitionFactory = partitionFactory;
ProducerConfig = producerConfigAccessor?.Value;
PartitionOptions = partitionAccessor?.Value;
}

protected override IProducer<string, JT809GpsPosition> CreateProducer()
{
return new ProducerBuilder<string, JT809GpsPosition>(ProducerConfig)
.SetValueSerializer((position) => position.ToByteArray())
.Build();
}
}
}

+ 120
- 0
src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Consumer.cs Datei anzeigen

@@ -0,0 +1,120 @@
using Confluent.Kafka;
using Google.Protobuf;
using JT809.GrpcProtos;
using JT809.PubSub.Abstractions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace JT809.KafkaService
{
public class JT809_GpsPosition_Consumer : IJT808ConsumerOfT<JT809GpsPosition>
{
public CancellationTokenSource Cts => new CancellationTokenSource();

public string TopicName => JT809Constants.JT809TopicName;

private readonly ILogger<JT809_GpsPosition_Consumer> logger;

private readonly List<TopicPartition> topicPartitionList;

private readonly List<IConsumer<string, JT809GpsPosition>> consumers;

private readonly JT809PartitionOptions partition;

public JT809_GpsPosition_Consumer(
IOptions<JT809PartitionOptions> partitionAccessor,
IOptions<ConsumerConfig> consumerConfigAccessor,
ILoggerFactory loggerFactory)
{
partition = partitionAccessor.Value;
logger = loggerFactory.CreateLogger<JT809_GpsPosition_Consumer>();
topicPartitionList = new List<TopicPartition>();
consumers = new List<IConsumer<string, JT809GpsPosition>>();
for (int i=0;i< partition.Partition; i++)
{
topicPartitionList.Add(new TopicPartition(TopicName, new Partition(i)));
consumers.Add(new ConsumerBuilder<string, JT809GpsPosition>(consumerConfigAccessor.Value)
.SetErrorHandler((consumer, error) => {
logger.LogError(error.Reason);
})
.SetValueDeserializer((data, isNull) => {
if (isNull) return default;
return new MessageParser<JT809GpsPosition>(() => new JT809GpsPosition())
.ParseFrom(data.ToArray());
})
.Build());
}
}

public JT809_GpsPosition_Consumer(
IOptions<ConsumerConfig> consumerConfigAccessor,
ILoggerFactory loggerFactory):this(new JT809PartitionOptions (), consumerConfigAccessor, loggerFactory)
{

}

public void OnMessage(Action<(string MsgId, JT809GpsPosition data)> callback)
{
logger.LogDebug($"consumers:{consumers.Count},topicPartitionList:{topicPartitionList.Count}");
for (int i = 0; i < consumers.Count; i++)
{
Task.Factory.StartNew((num) =>
{
int n = (int)num;
while (!Cts.IsCancellationRequested)
{
try
{
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据
//如果指定分区,根据kafka的机制会从相应的分区中拉取数据
consumers[n].Assign(topicPartitionList[n]);
var data = consumers[n].Consume(Cts.Token);
if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} Data:{string.Join("", data.Value)} TopicPartitionOffset:{data.TopicPartitionOffset}");
}
callback((data.Key, data.Value));
}
catch (ConsumeException ex)
{
logger.LogError(ex, TopicName);
Thread.Sleep(1000);
}
catch (Exception ex)
{
logger.LogError(ex, TopicName);
Thread.Sleep(1000);
}
}
}, i, Cts.Token);
}
}

public void Subscribe()
{
//仅有一个分区才需要订阅
if (topicPartitionList.Count == 1)
{
consumers[0].Subscribe(TopicName);
}
}

public void Unsubscribe()
{
consumers.ForEach(consumer => consumer.Unsubscribe());
}

public void Dispose()
{
Cts.Cancel();
consumers.ForEach(consumer => {
consumer.Close();
consumer.Dispose();
});
}
}
}

+ 25
- 0
src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/Partitions/JT809GpsPositionProducerPartitionFactoryImpl.cs Datei anzeigen

@@ -0,0 +1,25 @@
using JT809.PubSub.Abstractions;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT809.KafkaService.Partitions
{
public class JT809GpsPositionProducerPartitionFactoryImpl : IJT809ProducerPartitionFactory
{
private readonly JT809PartitionOptions partition;

public JT809GpsPositionProducerPartitionFactoryImpl(IOptions<JT809PartitionOptions> partitionAccessor)
{
partition = partitionAccessor.Value;
}

public int CreatePartition(string topicName, string msgId, string vno_color)
{
var key1Byte1 = JT809HashAlgorithm.ComputeMd5(vno_color);
var p = JT809HashAlgorithm.Hash(key1Byte1, 2) % partition.Partition;
return (int)p;
}
}
}

+ 19
- 0
src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809Consumer.cs Datei anzeigen

@@ -0,0 +1,19 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace JT809.PubSub.Abstractions
{
public interface IJT809Consumer : IJT809PubSub, IJT808ConsumerOfT<byte[]>
{

}
public interface IJT808ConsumerOfT<T> :IDisposable
{
void OnMessage(Action<(string MsgId, T data)> callback);
CancellationTokenSource Cts { get; }
void Subscribe();
void Unsubscribe();
}
}

+ 21
- 0
src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809Producer.cs Datei anzeigen

@@ -0,0 +1,21 @@
using System;
using System.Threading.Tasks;

namespace JT809.PubSub.Abstractions
{
public interface IJT809Producer: IJT809PubSub, IJT809ProducerOfT<byte[]>
{
}

public interface IJT809ProducerOfT<T>: IDisposable
{
/// <summary>
///
/// </summary>
/// <param name="msgId">消息Id</param>
/// <param name="vno_color">车牌号+车牌颜色</param>
/// <param name="data">hex data</param>
void ProduceAsync(string msgId, string vno_color, T data);
}
}

+ 17
- 0
src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809ProducerPartitionFactory.cs Datei anzeigen

@@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT809.PubSub.Abstractions
{
/// <summary>
/// jt809生产者分区工厂
/// 分区策略:
/// 1.可以根据(车牌号+颜色)进行分区
/// 2.可以根据msgId(消息Id)+(车牌号+颜色)进行分区
/// </summary>
public interface IJT809ProducerPartitionFactory
{
int CreatePartition(string topicName, string msgId, string vno_color);
}
}

+ 7
- 0
src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809PubSub.cs Datei anzeigen

@@ -0,0 +1,7 @@
namespace JT809.PubSub.Abstractions
{
public interface IJT809PubSub
{
string TopicName { get; }
}
}

+ 11
- 0
src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809.PubSub.Abstractions.csproj Datei anzeigen

@@ -0,0 +1,11 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Options" Version="2.2.0" />
</ItemGroup>

</Project>

+ 11
- 0
src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809Constants.cs Datei anzeigen

@@ -0,0 +1,11 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT809.PubSub.Abstractions
{
public static class JT809Constants
{
public const string JT809TopicName = "jt809";
}
}

+ 35
- 0
src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809HashAlgorithm.cs Datei anzeigen

@@ -0,0 +1,35 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Security.Cryptography;

namespace JT809.PubSub.Abstractions
{
public class JT809HashAlgorithm
{
/// <summary>
/// 使用Ketama
/// </summary>
/// <param name="digest"></param>
/// <param name="nTime"></param>
/// <returns></returns>
public static long Hash(byte[] digest, int nTime=1)
{
long rv = ((long)(digest[3 + nTime * 4] & 0xFF) << 24)
| ((long)(digest[2 + nTime * 4] & 0xFF) << 16)
| ((long)(digest[1 + nTime * 4] & 0xFF) << 8)
| ((long)digest[0 + nTime * 4] & 0xFF);
return rv & 0xffffffffL;
}

public static byte[] ComputeMd5(string key)
{
using (MD5 md5 = new MD5CryptoServiceProvider())
{
byte[] keyBytes = md5.ComputeHash(Encoding.UTF8.GetBytes(key));
md5.Clear();
return keyBytes;
}
}
}
}

+ 14
- 0
src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809PartitionOptions.cs Datei anzeigen

@@ -0,0 +1,14 @@
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT809.PubSub.Abstractions
{
public class JT809PartitionOptions:IOptions<JT809PartitionOptions>
{
public int Partition { get; set; } = 1;

public JT809PartitionOptions Value => this;
}
}

+ 28
- 2
src/JT809.DotNetty.sln Datei anzeigen

@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.28010.2016
# Visual Studio Version 16
VisualStudioVersion = 16.0.28803.156
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT809.DotNetty.Core", "JT809.DotNetty.Core\JT809.DotNetty.Core.csproj", "{0291C1D6-B4C6-4E7E-984B-0BAFB238727D}"
EndProject
@@ -15,6 +15,16 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{9EFC0937-851
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT809.Protocol", "JT809.Protocol\src\JT809.Protocol\JT809.Protocol.csproj", "{3D7271A3-02CF-4DF4-AD3C-1BF248D6D7CA}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Simples", "Simples", "{3C761892-4ED8-42D2-96CF-F76041D17EC1}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Superior", "Superior", "{E9DC871D-EFCE-4D53-A5B5-8A88D2D52EA4}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT809.PubSub.Abstractions", "JT809.DotNetty.Simples\Superior\JT809.PubSub.Abstractions\JT809.PubSub.Abstractions.csproj", "{5F8AFD67-FDA8-40A6-A655-FD855E2CCF26}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT809.GrpcProtos", "JT809.DotNetty.Simples\Superior\JT809.GrpcProtos\JT809.GrpcProtos.csproj", "{D64F2F77-DC0C-4120-80DA-45012A794CDF}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT809.KafkaService", "JT809.DotNetty.Simples\Superior\JT809.KafkaService\JT809.KafkaService.csproj", "{8119D905-241F-4EFF-B300-1FB474B8C665}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -37,6 +47,18 @@ Global
{3D7271A3-02CF-4DF4-AD3C-1BF248D6D7CA}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3D7271A3-02CF-4DF4-AD3C-1BF248D6D7CA}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3D7271A3-02CF-4DF4-AD3C-1BF248D6D7CA}.Release|Any CPU.Build.0 = Release|Any CPU
{5F8AFD67-FDA8-40A6-A655-FD855E2CCF26}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5F8AFD67-FDA8-40A6-A655-FD855E2CCF26}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5F8AFD67-FDA8-40A6-A655-FD855E2CCF26}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5F8AFD67-FDA8-40A6-A655-FD855E2CCF26}.Release|Any CPU.Build.0 = Release|Any CPU
{D64F2F77-DC0C-4120-80DA-45012A794CDF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D64F2F77-DC0C-4120-80DA-45012A794CDF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D64F2F77-DC0C-4120-80DA-45012A794CDF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D64F2F77-DC0C-4120-80DA-45012A794CDF}.Release|Any CPU.Build.0 = Release|Any CPU
{8119D905-241F-4EFF-B300-1FB474B8C665}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{8119D905-241F-4EFF-B300-1FB474B8C665}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8119D905-241F-4EFF-B300-1FB474B8C665}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8119D905-241F-4EFF-B300-1FB474B8C665}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -44,6 +66,10 @@ Global
GlobalSection(NestedProjects) = preSolution
{D4E18559-C429-416F-9399-42C0E604D27B} = {DD4611CF-79A9-45C7-91EB-1E84D22B7D07}
{3D7271A3-02CF-4DF4-AD3C-1BF248D6D7CA} = {9EFC0937-8515-4EFB-8FE7-558C6FC8860C}
{E9DC871D-EFCE-4D53-A5B5-8A88D2D52EA4} = {3C761892-4ED8-42D2-96CF-F76041D17EC1}
{5F8AFD67-FDA8-40A6-A655-FD855E2CCF26} = {E9DC871D-EFCE-4D53-A5B5-8A88D2D52EA4}
{D64F2F77-DC0C-4120-80DA-45012A794CDF} = {E9DC871D-EFCE-4D53-A5B5-8A88D2D52EA4}
{8119D905-241F-4EFF-B300-1FB474B8C665} = {E9DC871D-EFCE-4D53-A5B5-8A88D2D52EA4}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {0FC2A52E-3B7A-4485-9C3B-9080C825419D}


Laden…
Abbrechen
Speichern