diff --git a/src/JT808.DotNetty/Codecs/JT808ClientDecoder.cs b/src/JT808.DotNetty/Codecs/JT808ClientDecoder.cs new file mode 100644 index 0000000..662ebff --- /dev/null +++ b/src/JT808.DotNetty/Codecs/JT808ClientDecoder.cs @@ -0,0 +1,59 @@ +using DotNetty.Buffers; +using DotNetty.Codecs; +using DotNetty.Transport.Channels; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using JT808.Protocol; +using JT808.DotNetty.Internal; +using JT808.DotNetty.Interfaces; + +namespace JT808.DotNetty.Codecs +{ + /// + /// JT808客户端解码(测试客户端) + /// + internal class JT808ClientDecoder : ByteToMessageDecoder + { + private static readonly ILogger logger=new LoggerFactory().CreateLogger(); + + private static readonly JT808AtomicCounterService jT808AtomicCounterService=new JT808AtomicCounterService (); + + protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List output) + { + byte[] buffer = new byte[input.Capacity + 2]; + try + { + input.ReadBytes(buffer, 1, input.Capacity); + buffer[0] = JT808Package.BeginFlag; + buffer[input.Capacity + 1] = JT808Package.EndFlag; + JT808Package jT808Package = JT808Serializer.Deserialize(buffer); + output.Add(jT808Package); + jT808AtomicCounterService.MsgSuccessIncrement(); + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug("accept package success count<<<" + jT808AtomicCounterService.MsgSuccessCount.ToString()); + } + } + catch (JT808.Protocol.Exceptions.JT808Exception ex) + { + jT808AtomicCounterService.MsgFailIncrement(); + if (logger.IsEnabled(LogLevel.Error)) + { + logger.LogError("accept package fail count<<<" + jT808AtomicCounterService.MsgFailCount.ToString()); + logger.LogError(ex, "accept msg<<<" + buffer); + } + } + catch (Exception ex) + { + jT808AtomicCounterService.MsgFailIncrement(); + if (logger.IsEnabled(LogLevel.Error)) + { + logger.LogError("accept package fail count<<<" + jT808AtomicCounterService.MsgFailCount.ToString()); + logger.LogError(ex, "accept msg<<<" + buffer); + } + } + } + } +} diff --git a/src/JT808.DotNetty/JT808SimpleTcpClient.cs b/src/JT808.DotNetty/JT808SimpleTcpClient.cs new file mode 100644 index 0000000..fcadd98 --- /dev/null +++ b/src/JT808.DotNetty/JT808SimpleTcpClient.cs @@ -0,0 +1,53 @@ +using DotNetty.Buffers; +using DotNetty.Codecs; +using DotNetty.Transport.Bootstrapping; +using DotNetty.Transport.Channels; +using DotNetty.Transport.Channels.Sockets; +using DotNetty.Transport.Libuv; +using JT808.DotNetty.Codecs; +using JT808.DotNetty.Handlers; +using System; +using System.Collections.Generic; +using System.Net; +using System.Text; +using System.Threading.Tasks; + +namespace JT808.DotNetty +{ + public class JT808SimpleTcpClient + { + private Bootstrap cb; + + private MultithreadEventLoopGroup clientGroup; + + private IChannel clientChannel; + + public JT808SimpleTcpClient(EndPoint remoteAddress) + { + clientGroup = new MultithreadEventLoopGroup(1); + cb = new Bootstrap() + .Group(clientGroup) + .Channel() + .Option(ChannelOption.TcpNodelay, true) + .Handler(new ActionChannelInitializer(channel => + { + channel.Pipeline.AddLast("jt808Buffer", new DelimiterBasedFrameDecoder(int.MaxValue, + Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.BeginFlag }), + Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.EndFlag }))); + channel.Pipeline.AddLast("jt808Decode", new JT808ClientDecoder()); + })); + clientChannel = cb.ConnectAsync(remoteAddress).Result; + } + + public void WriteAsync(byte[] data) + { + clientChannel.WriteAndFlushAsync(Unpooled.WrappedBuffer(data)); + } + + public void Down() + { + this.clientChannel?.CloseAsync().Wait(); + Task.WaitAll(this.clientGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1))); + } + } +}