广义上区分,通信协议可以分为公有协议和私有协议。由于私有协议的灵活性,它往往会在某个公司或者组织内部使用,按需定制,也因为如此,升级起来会非常方便,灵活性好。绝大多数的私有协议传输层都基于TCP/IP,所以利用Netty的NIO TCP协议栈可以非常方便地进行私有协议的定制和开发。
(1) Netty协议栈客户端发送握手请求消息,携带节点ID等有效身份认证信息;
(2) Netty 协议栈服务端对握手请求消息进行合法性校验,包括节点ID有效性校验、节点重复登录校验和IP地址合法性校验,校验通过后,返回登录成功的握手应答消息:
(3)链路建立成功之后,客户端发送业务消息;
(4)链路成功之后,服务端发送心跳消息;
(5)链路建立成功之后,客户端发送心跳消息;
(6)链路建立成功之后,服务端发送业务消息;
(7)服务端退出时,服务端关闭连接,客户端感知对方关闭连接后,被动关闭客户端连接。
syntax = "proto3"; option java_package = "com.fy.protobuf"; option java_outer_classname="CustomMessageData"; message MessageData{ int64 length = 1; Content content = 2; enum DataType { REQ_LOGIN = 0; //上线登录验证环节 等基础信息上报 RSP_LOGIN = 1; //返回上线登录状态与基础信息 PING = 2; //心跳 PONG = 3; //心跳 REQ_ACT = 4; //动作请求 RSP_ACT = 5; //动作响应 REQ_CMD = 6; //指令请求 RSP_CMD = 7; //指令响应 REQ_LOG = 8 ;//日志请求 RSP_LOG = 9; //日志响应 } DataType order = 3; message Content{ int64 contentLength = 1; string data = 2; } }
tip????下列步骤有点吃力的小伙伴可以看看之前的文章:https://blog.csdn.net/kunfeisang5551/article/details/107957256
1、在D盘protobuf路径下执行命令:protoc.exe --java_out=D:\protobuf CustomMsg.proto
2、将生成的文件拷贝到项目中
1、新建maven项目,引入依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.51.Final</version> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.11.0</version> </dependency>
2、创建服务端启动代码
public class CustomServer { public void bind(int port) { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap .group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() //消息头定长 .addLast(new ProtobufVarint32FrameDecoder()) //解码指定的消息类型 .addLast(new ProtobufDecoder(CustomMessageData.MessageData.getDefaultInstance())) //消息头设置长度 .addLast(new ProtobufVarint32LengthFieldPrepender()) //解码 .addLast(new ProtobufEncoder()) //心跳检测,超过设置的时间将会抛出异常ReadTimeoutException .addLast(new ReadTimeoutHandler(8)) //消息处理 .addLast(new CustomServerHandler()) //心跳响应 .addLast(new CustomServerHeartBeatHandler()); } }); // 绑定端口同步等待启动成功 ChannelFuture sync = bootstrap.bind(port).sync(); // 等待服务监听端口关闭 sync.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }
3、创建服务端消息处理代码
public class CustomServerHandler extends ChannelInboundHandlerAdapter { private String[] whiteIPv4List = {"127.0.0.1", "192.168.1.188"}; public static ConcurrentHashMap nodeCheck = new ConcurrentHashMap(); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { CustomMessageData.MessageData messageData = (CustomMessageData.MessageData) msg; if (messageData.getOrder() == CustomMessageData.MessageData.DataType.UNRECOGNIZED) { // 无法识别的消息类型 ctx.close(); } if (messageData.getOrder() == CustomMessageData.MessageData.DataType.REQ_LOGIN) { // 检查重复登录 String nodeIndex = ctx.channel().remoteAddress().toString(); if (nodeCheck.contains(nodeIndex)) { // 重复登录 ctx.writeAndFlush(builderResp(false)); return; } else { InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress(); String ip = socketAddress.getAddress().getHostAddress(); boolean isOk = false; // 检查白名单 for (String s : whiteIPv4List) { if (s.equals(ip)) { isOk = true; break; } } // 成功响应 CustomMessageData.MessageData responseData = isOk ? builderResp(true) : builderResp(false); if (isOk) { nodeCheck.put(nodeIndex, true); } ctx.writeAndFlush(responseData); } } else { //心跳消息处理 ctx.fireChannelRead(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { nodeCheck.remove(ctx.channel().remoteAddress().toString()); if (ctx.channel().isActive()) { ctx.close(); } } public CustomMessageData.MessageData builderResp(boolean isOk) { String r = isOk ? "SUCCESS" : "FAILED"; CustomMessageData.MessageData.Content responseContent = CustomMessageData.MessageData.Content.newBuilder().setData(r).setContentLength(r.length()).build(); CustomMessageData.MessageData responseData = CustomMessageData.MessageData.newBuilder().setOrder(CustomMessageData.MessageData.DataType.RSP_LOGIN).setContent(responseContent).build(); return responseData; } }
4、创建服务端心跳响应代码
public class CustomServerHeartBeatHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { CustomMessageData.MessageData messageData = (CustomMessageData.MessageData) msg; if (messageData.getOrder() == CustomMessageData.MessageData.DataType.PING) { CustomMessageData.MessageData req = CustomMessageData.MessageData.newBuilder() .setOrder(CustomMessageData.MessageData.DataType.PONG).build(); System.out.println("Send-Client:PONG,time:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); ctx.writeAndFlush(req); } else { ctx.fireChannelRead(msg); } } }
5、创建客户端启动代码
public class CustomClient { public void bind(int port) { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new ProtobufVarint32FrameDecoder()) .addLast(new ProtobufDecoder(CustomMessageData.MessageData.getDefaultInstance())) .addLast(new ProtobufVarint32LengthFieldPrepender()) .addLast(new ProtobufEncoder()) // 消息处理 .addLast(new CustomClientHandler()) // 心跳响应 .addLast(new CustomClientHeartBeatHandler()); } }); ChannelFuture f = b.connect("127.0.0.1", port).sync(); f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { // 短线重连 定时5秒 group.execute(() -> { try { TimeUnit.MILLISECONDS.sleep(5); bind(port); } catch (InterruptedException e) { e.printStackTrace(); } }); // group.shutdownGracefully(); } } }
6、创建客户端消息处理代码
这里的逻辑主要是通道激活后马上发送业务消息,然后保持心跳
public class CustomClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { CustomMessageData.MessageData reqData = CustomMessageData .MessageData .newBuilder() .setOrder(CustomMessageData.MessageData.DataType.REQ_LOGIN) .build(); ctx.channel().writeAndFlush(reqData); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { CustomMessageData.MessageData respData = (CustomMessageData.MessageData) msg; if (respData.getOrder() == CustomMessageData.MessageData.DataType.RSP_LOGIN) { // 响应登录请求处理逻辑 boolean equals = respData.getContent().getData().equals("SUCCESS"); if (equals) { System.out.println("Receive-Server:LoginSuccess,time:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); System.out.println(respData.toString()); // 传递下一个handler ctx.fireChannelRead(msg); } else { // 登录失败 if (ctx.channel().isActive()) { ctx.close(); } } } else { // 响应心跳处理逻辑 ctx.fireChannelRead(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); if (ctx.channel().isActive()) { ctx.close(); } } }
7、创建客户端心跳保持代码
public class CustomClientHeartBeatHandler extends ChannelInboundHandlerAdapter { private static ScheduledFuture heartbeatFuture; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { CustomMessageData.MessageData messageData = (CustomMessageData.MessageData) msg; if (messageData.getOrder() == CustomMessageData.MessageData.DataType.RSP_LOGIN) { // 登录成功后保持心跳 间隔为5秒 heartbeatFuture = ctx.executor().scheduleAtFixedRate(() -> { CustomMessageData.MessageData req = CustomMessageData.MessageData.newBuilder() .setOrder(CustomMessageData.MessageData.DataType.PING).build(); System.out.println("Send-Server:PING,time:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); ctx.writeAndFlush(req); }, 0, 5, TimeUnit.SECONDS); } else if (messageData.getOrder() == CustomMessageData.MessageData.DataType.PONG) { System.out.println("Receive-Server:PONG,time:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); System.out.println(); } else { ctx.fireChannelRead(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 发生异常就取消心跳保持 if (heartbeatFuture != null) { heartbeatFuture.cancel(true); heartbeatFuture = null; } ctx.fireExceptionCaught(cause); } }
8、启动服务端
public class Server { public static void main(String[] args) throws Exception { new CustomServer().bind(8080); } }
9、启动客户端
public class Client { public static void main(String[] args) { new CustomClient().bind(8080); } }
1、客户端
Receive-Server:LoginSuccess,time:2020-08-12 17:31:47 content { contentLength: 7 data: "SUCCESS" } order: RSP_LOGIN Send-Server:PING,time:2020-08-12 17:31:47 Receive-Server:PONG,time:2020-08-12 17:31:47 Send-Server:PING,time:2020-08-12 17:31:52 Receive-Server:PONG,time:2020-08-12 17:31:52 Send-Server:PING,time:2020-08-12 17:31:57 Receive-Server:PONG,time:2020-08-12 17:31:57 Send-Server:PING,time:2020-08-12 17:32:02 Receive-Server:PONG,time:2020-08-12 17:32:02
我们可以看到,当客户端发送登录请求后,服务端响应登录成功消息,然后交替打印心跳保持信息,间隔为5秒。
2、服务端
Send-Client:PONG,time:2020-08-12 17:31:47 Send-Client:PONG,time:2020-08-12 17:31:52 Send-Client:PONG,time:2020-08-12 17:31:57 Send-Client:PONG,time:2020-08-12 17:32:02 Send-Client:PONG,time:2020-08-12 17:32:07
服务端响应登录请求后交替打印心跳保持信息。
3、测试服务端异常
我们先停掉服务端,看看客户端有啥反应,客户端日志:
Connection refused: no further information
客户端5秒打印一次异常信息,说明短线重连逻辑正常
我们接着再启动服务端,看看客户端有啥反应
io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: /127.0.0.1:8080 Caused by: java.net.ConnectException: Connection refused: no further information at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) Receive-Server:LoginSuccess,time:2020-08-12 17:44:15 content { contentLength: 7 data: "SUCCESS" } order: RSP_LOGIN Send-Server:PING,time:2020-08-12 17:44:15 Receive-Server:PONG,time:2020-08-12 17:44:15 Send-Server:PING,time:2020-08-12 17:44:20 Receive-Server:PONG,time:2020-08-12 17:44:20
可以看到由异常转为正常啦~
通过测试可以验证是否符合私有协议的约定:
(1)客户端是否能够正常发起重连:
(2)重连成功之后,不再重连:
(3)断连期间,心跳定时器停止工作,不再发送心跳请求消息;
(4)服务端重启成功之后,允许客户端重新登录;
(5)服务端重启成功之后,客户端能够重连和握手成功:
(6)重连成功之后,双方的心跳能够正常互发。
(7)性能指标:重连期间,客户端资源得到了正常回收,不会导致句柄等资源泄漏。
GitHub服务端地址:https://github.com/GoodBoy2333/netty-server-maven.git
GitHub客户端地址:https://github.com/GoodBoy2333/netty-client-maven.git