From ca6bed23fa621f7c0cbaa4b199272abc53525b53 Mon Sep 17 00:00:00 2001 From: SmallChi <564952747@qq.com> Date: Mon, 26 Nov 2018 00:38:43 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=AE=80=E5=8D=95tcp?= =?UTF-8?q?=E6=B5=8B=E8=AF=95=E5=AE=A2=E6=88=B7=E7=AB=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Codecs/JT808ClientDecoder.cs | 59 +++++++++++++++++++ src/JT808.DotNetty/JT808SimpleTcpClient.cs | 53 +++++++++++++++++ 2 files changed, 112 insertions(+) create mode 100644 src/JT808.DotNetty/Codecs/JT808ClientDecoder.cs create mode 100644 src/JT808.DotNetty/JT808SimpleTcpClient.cs diff --git a/src/JT808.DotNetty/Codecs/JT808ClientDecoder.cs b/src/JT808.DotNetty/Codecs/JT808ClientDecoder.cs new file mode 100644 index 0000000..662ebff --- /dev/null +++ b/src/JT808.DotNetty/Codecs/JT808ClientDecoder.cs @@ -0,0 +1,59 @@ +using DotNetty.Buffers; +using DotNetty.Codecs; +using DotNetty.Transport.Channels; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using JT808.Protocol; +using JT808.DotNetty.Internal; +using JT808.DotNetty.Interfaces; + +namespace JT808.DotNetty.Codecs +{ + /// + /// JT808客户端解码(测试客户端) + /// + internal class JT808ClientDecoder : ByteToMessageDecoder + { + private static readonly ILogger logger=new LoggerFactory().CreateLogger(); + + private static readonly JT808AtomicCounterService jT808AtomicCounterService=new JT808AtomicCounterService (); + + protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List output) + { + byte[] buffer = new byte[input.Capacity + 2]; + try + { + input.ReadBytes(buffer, 1, input.Capacity); + buffer[0] = JT808Package.BeginFlag; + buffer[input.Capacity + 1] = JT808Package.EndFlag; + JT808Package jT808Package = JT808Serializer.Deserialize(buffer); + output.Add(jT808Package); + jT808AtomicCounterService.MsgSuccessIncrement(); + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug("accept package success count<<<" + jT808AtomicCounterService.MsgSuccessCount.ToString()); + } + } + catch (JT808.Protocol.Exceptions.JT808Exception ex) + { + jT808AtomicCounterService.MsgFailIncrement(); + if (logger.IsEnabled(LogLevel.Error)) + { + logger.LogError("accept package fail count<<<" + jT808AtomicCounterService.MsgFailCount.ToString()); + logger.LogError(ex, "accept msg<<<" + buffer); + } + } + catch (Exception ex) + { + jT808AtomicCounterService.MsgFailIncrement(); + if (logger.IsEnabled(LogLevel.Error)) + { + logger.LogError("accept package fail count<<<" + jT808AtomicCounterService.MsgFailCount.ToString()); + logger.LogError(ex, "accept msg<<<" + buffer); + } + } + } + } +} diff --git a/src/JT808.DotNetty/JT808SimpleTcpClient.cs b/src/JT808.DotNetty/JT808SimpleTcpClient.cs new file mode 100644 index 0000000..fcadd98 --- /dev/null +++ b/src/JT808.DotNetty/JT808SimpleTcpClient.cs @@ -0,0 +1,53 @@ +using DotNetty.Buffers; +using DotNetty.Codecs; +using DotNetty.Transport.Bootstrapping; +using DotNetty.Transport.Channels; +using DotNetty.Transport.Channels.Sockets; +using DotNetty.Transport.Libuv; +using JT808.DotNetty.Codecs; +using JT808.DotNetty.Handlers; +using System; +using System.Collections.Generic; +using System.Net; +using System.Text; +using System.Threading.Tasks; + +namespace JT808.DotNetty +{ + public class JT808SimpleTcpClient + { + private Bootstrap cb; + + private MultithreadEventLoopGroup clientGroup; + + private IChannel clientChannel; + + public JT808SimpleTcpClient(EndPoint remoteAddress) + { + clientGroup = new MultithreadEventLoopGroup(1); + cb = new Bootstrap() + .Group(clientGroup) + .Channel() + .Option(ChannelOption.TcpNodelay, true) + .Handler(new ActionChannelInitializer(channel => + { + channel.Pipeline.AddLast("jt808Buffer", new DelimiterBasedFrameDecoder(int.MaxValue, + Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.BeginFlag }), + Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.EndFlag }))); + channel.Pipeline.AddLast("jt808Decode", new JT808ClientDecoder()); + })); + clientChannel = cb.ConnectAsync(remoteAddress).Result; + } + + public void WriteAsync(byte[] data) + { + clientChannel.WriteAndFlushAsync(Unpooled.WrappedBuffer(data)); + } + + public void Down() + { + this.clientChannel?.CloseAsync().Wait(); + Task.WaitAll(this.clientGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1))); + } + } +}