Netty集成ProtoBuf开发私有协议

私有协议

广义上区分,通信协议可以分为公有协议和私有协议。由于私有协议的灵活性,它往往会在某个公司或者组织内部使用,按需定制,也因为如此,升级起来会非常方便,灵活性好。绝大多数的私有协议传输层都基于TCP/IP,所以利用Netty的NIO TCP协议栈可以非常方便地进行私有协议的定制和开发。

通信模型

Netty集成ProtoBuf开发私有协议

(1) Netty协议栈客户端发送握手请求消息,携带节点ID等有效身份认证信息;
(2) Netty 协议栈服务端对握手请求消息进行合法性校验,包括节点ID有效性校验、节点重复登录校验和IP地址合法性校验,校验通过后,返回登录成功的握手应答消息:
(3)链路建立成功之后,客户端发送业务消息;
(4)链路成功之后,服务端发送心跳消息;
(5)链路建立成功之后,客户端发送心跳消息;
(6)链路建立成功之后,服务端发送业务消息;
(7)服务端退出时,服务端关闭连接,客户端感知对方关闭连接后,被动关闭客户端连接。

ProtoBuf数据格式

syntax = "proto3"; option java_package = "com.fy.protobuf"; option java_outer_classname="CustomMessageData";  message MessageData{     int64 length = 1;     Content content = 2;     enum DataType {         REQ_LOGIN = 0;  //上线登录验证环节 等基础信息上报         RSP_LOGIN = 1;  //返回上线登录状态与基础信息         PING = 2;  //心跳         PONG = 3;  //心跳         REQ_ACT = 4;  //动作请求         RSP_ACT = 5;  //动作响应         REQ_CMD = 6;  //指令请求         RSP_CMD = 7;  //指令响应         REQ_LOG = 8 ;//日志请求         RSP_LOG = 9;  //日志响应     }     DataType order = 3;     message Content{         int64 contentLength = 1;         string data = 2;     } }

开发步骤

tip????下列步骤有点吃力的小伙伴可以看看之前的文章:https://blog.csdn.net/kunfeisang5551/article/details/107957256

1、在D盘protobuf路径下执行命令:protoc.exe --java_out=D:\protobuf CustomMsg.proto

2、将生成的文件拷贝到项目中

开始Coding~

1、新建maven项目,引入依赖

<dependency>     <groupId>io.netty</groupId>     <artifactId>netty-all</artifactId>     <version>4.1.51.Final</version> </dependency> <dependency>     <groupId>com.google.protobuf</groupId>     <artifactId>protobuf-java</artifactId>     <version>3.11.0</version> </dependency>

2、创建服务端启动代码

public class CustomServer {     public void bind(int port) {         EventLoopGroup bossGroup = new NioEventLoopGroup();         EventLoopGroup workGroup = new NioEventLoopGroup();         try {             ServerBootstrap bootstrap = new ServerBootstrap();             bootstrap                     .group(bossGroup, workGroup)                     .channel(NioServerSocketChannel.class)                     .handler(new LoggingHandler(LogLevel.INFO))                     .childHandler(new ChannelInitializer<SocketChannel>() {                         @Override                         protected void initChannel(SocketChannel socketChannel) throws Exception {                             socketChannel.pipeline()                                     //消息头定长                                     .addLast(new ProtobufVarint32FrameDecoder())                                     //解码指定的消息类型                                     .addLast(new ProtobufDecoder(CustomMessageData.MessageData.getDefaultInstance()))                                     //消息头设置长度                                     .addLast(new ProtobufVarint32LengthFieldPrepender())                                     //解码                                     .addLast(new ProtobufEncoder())                                     //心跳检测,超过设置的时间将会抛出异常ReadTimeoutException                                     .addLast(new ReadTimeoutHandler(8))                                     //消息处理                                     .addLast(new CustomServerHandler())                                     //心跳响应                                     .addLast(new CustomServerHeartBeatHandler());                         }                     });             // 绑定端口同步等待启动成功             ChannelFuture sync = bootstrap.bind(port).sync();              // 等待服务监听端口关闭             sync.channel().closeFuture().sync();         } catch (Exception e) {             e.printStackTrace();         } finally {             bossGroup.shutdownGracefully();             workGroup.shutdownGracefully();         }     } }

3、创建服务端消息处理代码

public class CustomServerHandler extends ChannelInboundHandlerAdapter {      private String[] whiteIPv4List = {"127.0.0.1", "192.168.1.188"};     public static ConcurrentHashMap nodeCheck = new ConcurrentHashMap();      @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {         CustomMessageData.MessageData messageData = (CustomMessageData.MessageData) msg;         if (messageData.getOrder() == CustomMessageData.MessageData.DataType.UNRECOGNIZED) {             // 无法识别的消息类型             ctx.close();         }          if (messageData.getOrder() == CustomMessageData.MessageData.DataType.REQ_LOGIN) {             // 检查重复登录             String nodeIndex = ctx.channel().remoteAddress().toString();             if (nodeCheck.contains(nodeIndex)) {                 // 重复登录                 ctx.writeAndFlush(builderResp(false));                 return;             } else {                 InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();                 String ip = socketAddress.getAddress().getHostAddress();                 boolean isOk = false;                 // 检查白名单                 for (String s : whiteIPv4List) {                     if (s.equals(ip)) {                         isOk = true;                         break;                     }                 }                 // 成功响应                 CustomMessageData.MessageData responseData = isOk ? builderResp(true) : builderResp(false);                 if (isOk) {                     nodeCheck.put(nodeIndex, true);                 }                 ctx.writeAndFlush(responseData);             }         } else {             //心跳消息处理             ctx.fireChannelRead(msg);         }     }      @Override     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {         nodeCheck.remove(ctx.channel().remoteAddress().toString());         if (ctx.channel().isActive()) {             ctx.close();         }     }      public CustomMessageData.MessageData builderResp(boolean isOk) {         String r = isOk ? "SUCCESS" : "FAILED";         CustomMessageData.MessageData.Content responseContent = CustomMessageData.MessageData.Content.newBuilder().setData(r).setContentLength(r.length()).build();         CustomMessageData.MessageData responseData = CustomMessageData.MessageData.newBuilder().setOrder(CustomMessageData.MessageData.DataType.RSP_LOGIN).setContent(responseContent).build();         return responseData;     } }

4、创建服务端心跳响应代码

public class CustomServerHeartBeatHandler extends ChannelInboundHandlerAdapter {      @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {         CustomMessageData.MessageData messageData = (CustomMessageData.MessageData) msg;         if (messageData.getOrder() == CustomMessageData.MessageData.DataType.PING) {             CustomMessageData.MessageData req = CustomMessageData.MessageData.newBuilder()                     .setOrder(CustomMessageData.MessageData.DataType.PONG).build();             System.out.println("Send-Client:PONG,time:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));             ctx.writeAndFlush(req);         } else {             ctx.fireChannelRead(msg);         }     } }

5、创建客户端启动代码

public class CustomClient {     public void bind(int port) {         EventLoopGroup group = new NioEventLoopGroup();         try {             Bootstrap b = new Bootstrap();             b.group(group)                     .channel(NioSocketChannel.class)                     .handler(new ChannelInitializer<SocketChannel>() {                         @Override                         protected void initChannel(SocketChannel socketChannel) throws Exception {                             socketChannel.pipeline()                                     .addLast(new ProtobufVarint32FrameDecoder())                                     .addLast(new ProtobufDecoder(CustomMessageData.MessageData.getDefaultInstance()))                                     .addLast(new ProtobufVarint32LengthFieldPrepender())                                     .addLast(new ProtobufEncoder())                                     // 消息处理                                     .addLast(new CustomClientHandler())                                     // 心跳响应                                     .addLast(new CustomClientHeartBeatHandler());                         }                     });             ChannelFuture f = b.connect("127.0.0.1", port).sync();              f.channel().closeFuture().sync();         } catch (Exception e) {             e.printStackTrace();         } finally {             // 短线重连 定时5秒             group.execute(() -> {                 try {                     TimeUnit.MILLISECONDS.sleep(5);                     bind(port);                 } catch (InterruptedException e) {                     e.printStackTrace();                 }             }); //            group.shutdownGracefully();         }     } }

6、创建客户端消息处理代码

这里的逻辑主要是通道激活后马上发送业务消息,然后保持心跳

public class CustomClientHandler extends ChannelInboundHandlerAdapter {      @Override     public void channelActive(ChannelHandlerContext ctx) throws Exception {         CustomMessageData.MessageData reqData = CustomMessageData                 .MessageData                 .newBuilder()                 .setOrder(CustomMessageData.MessageData.DataType.REQ_LOGIN)                 .build();         ctx.channel().writeAndFlush(reqData);     }      @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {         CustomMessageData.MessageData respData = (CustomMessageData.MessageData) msg;         if (respData.getOrder() == CustomMessageData.MessageData.DataType.RSP_LOGIN) {             // 响应登录请求处理逻辑             boolean equals = respData.getContent().getData().equals("SUCCESS");             if (equals) {                 System.out.println("Receive-Server:LoginSuccess,time:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));                 System.out.println(respData.toString());                 // 传递下一个handler                 ctx.fireChannelRead(msg);             } else {                 // 登录失败                 if (ctx.channel().isActive()) {                     ctx.close();                 }             }         } else {             // 响应心跳处理逻辑             ctx.fireChannelRead(msg);         }      }      @Override     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {         super.exceptionCaught(ctx, cause);         if (ctx.channel().isActive()) {             ctx.close();         }     } }

7、创建客户端心跳保持代码

public class CustomClientHeartBeatHandler extends ChannelInboundHandlerAdapter {      private static ScheduledFuture heartbeatFuture;      @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {         CustomMessageData.MessageData messageData = (CustomMessageData.MessageData) msg;         if (messageData.getOrder() == CustomMessageData.MessageData.DataType.RSP_LOGIN) {             // 登录成功后保持心跳 间隔为5秒             heartbeatFuture = ctx.executor().scheduleAtFixedRate(() -> {                 CustomMessageData.MessageData req = CustomMessageData.MessageData.newBuilder()                         .setOrder(CustomMessageData.MessageData.DataType.PING).build();                 System.out.println("Send-Server:PING,time:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));                 ctx.writeAndFlush(req);             }, 0, 5, TimeUnit.SECONDS);         } else if (messageData.getOrder() == CustomMessageData.MessageData.DataType.PONG) {             System.out.println("Receive-Server:PONG,time:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));             System.out.println();         } else {             ctx.fireChannelRead(msg);         }     }      @Override     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {         // 发生异常就取消心跳保持         if (heartbeatFuture != null) {             heartbeatFuture.cancel(true);             heartbeatFuture = null;         }         ctx.fireExceptionCaught(cause);     } }

8、启动服务端

public class Server {     public static void main(String[] args) throws Exception {         new CustomServer().bind(8080);     } }

9、启动客户端

public class Client {     public static void main(String[] args) {         new CustomClient().bind(8080);     } }

控制台打印

1、客户端

Netty集成ProtoBuf开发私有协议

Receive-Server:LoginSuccess,time:2020-08-12 17:31:47 content {   contentLength: 7   data: "SUCCESS" } order: RSP_LOGIN  Send-Server:PING,time:2020-08-12 17:31:47 Receive-Server:PONG,time:2020-08-12 17:31:47  Send-Server:PING,time:2020-08-12 17:31:52 Receive-Server:PONG,time:2020-08-12 17:31:52  Send-Server:PING,time:2020-08-12 17:31:57 Receive-Server:PONG,time:2020-08-12 17:31:57  Send-Server:PING,time:2020-08-12 17:32:02 Receive-Server:PONG,time:2020-08-12 17:32:02

我们可以看到,当客户端发送登录请求后,服务端响应登录成功消息,然后交替打印心跳保持信息,间隔为5秒。

2、服务端

Netty集成ProtoBuf开发私有协议

Send-Client:PONG,time:2020-08-12 17:31:47 Send-Client:PONG,time:2020-08-12 17:31:52 Send-Client:PONG,time:2020-08-12 17:31:57 Send-Client:PONG,time:2020-08-12 17:32:02 Send-Client:PONG,time:2020-08-12 17:32:07

服务端响应登录请求后交替打印心跳保持信息。

3、测试服务端异常

我们先停掉服务端,看看客户端有啥反应,客户端日志:

Connection refused: no further information

客户端5秒打印一次异常信息,说明短线重连逻辑正常

我们接着再启动服务端,看看客户端有啥反应

Netty集成ProtoBuf开发私有协议

io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: /127.0.0.1:8080 Caused by: java.net.ConnectException: Connection refused: no further information 	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) 	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330) 	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) 	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702) 	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) 	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) 	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) 	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) 	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) 	at java.lang.Thread.run(Thread.java:748) Receive-Server:LoginSuccess,time:2020-08-12 17:44:15 content {   contentLength: 7   data: "SUCCESS" } order: RSP_LOGIN  Send-Server:PING,time:2020-08-12 17:44:15 Receive-Server:PONG,time:2020-08-12 17:44:15  Send-Server:PING,time:2020-08-12 17:44:20 Receive-Server:PONG,time:2020-08-12 17:44:20

可以看到由异常转为正常啦~

通过测试可以验证是否符合私有协议的约定:

(1)客户端是否能够正常发起重连:
(2)重连成功之后,不再重连:
(3)断连期间,心跳定时器停止工作,不再发送心跳请求消息;
(4)服务端重启成功之后,允许客户端重新登录;
(5)服务端重启成功之后,客户端能够重连和握手成功:
(6)重连成功之后,双方的心跳能够正常互发。
(7)性能指标:重连期间,客户端资源得到了正常回收,不会导致句柄等资源泄漏。

GitHub服务端地址:https://github.com/GoodBoy2333/netty-server-maven.git

GitHub客户端地址:https://github.com/GoodBoy2333/netty-client-maven.git