Zookeeper——客户端API的相关方法操作

1.前言

2.实操步骤

2.1 创建zk客户端,完成与服务端的连接

2.2 创建节点信息

2.3 获取子结点并监听节点变化

2.4 判断节点是否存在

3.浅谈写数据原理

3.1 写流程之写入请求直接发送给Leader节点

3.2 写流程之写入请求直接发送给Follower节点


首先,在上一篇博客中,主要是对zookeeper集群的相关操作,那么由于我的笔记本是 8G 内存的,所以开三台Linux还是可以的,但是再开个IDEA了话就炸了,所以我这里针对zookeeper和Java API的操作,就切换成zookeeper单机模式了,也就是只启动一台zookeeper。

由于之前配置的是zookeeper集群,现在转为单机版,所以需要把 zoo.cfg 配置文件中关于集群的那些注释掉:????????????


首先建一个maven项目,pom文件中添加如下依赖。

    <dependencies>         <dependency>             <groupId>junit</groupId>             <artifactId>junit</artifactId>             <version>RELEASE</version>         </dependency>         <dependency>             <groupId>org.apache.logging.log4j</groupId>             <artifactId>log4j-core</artifactId>             <version>2.8.2</version>         </dependency>         <dependency>             <groupId>org.apache.zookeeper</groupId>             <artifactId>zookeeper</artifactId>             <version>3.5.7</version>         </dependency>     </dependencies>
需要在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。
log4j.rootLogger=INFO, stdout  log4j.appender.stdout=org.apache.log4j.ConsoleAppender  log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n  log4j.appender.logfile=org.apache.log4j.FileAppender  log4j.appender.logfile.File=target/spring.log  log4j.appender.logfile.layout=org.apache.log4j.PatternLayout  log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

2.1 创建zk客户端,完成与服务端的连接

package com.szh.zk;  import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test;  import java.io.IOException; import java.util.List;  /**  *  */ public class ZookeeperClient {      //单机模式     private static String connectString = "192.168.40.130:2181";     //集群模式 //    private static String connectString = "zk101:2181,zk102:2181,zk103:2181";     private static int sessionTimeout = 30000;     private ZooKeeper zkClient = null;      /**      * 创建ZooKeeper客户端,完成服务端与客户端之间的连接      * connectString: 具体连接的那台zookeeper服务器的地址      * sessionTimeout: 客户端和服务端直接连接的超时时限      * watcher: 监听器相关设置      */     @Test     public void init() throws IOException {         zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {             @Override             public void process(WatchedEvent watchedEvent) {              }         });     } }

2.2 创建节点信息

package com.szh.zk;  import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test;  import java.io.IOException; import java.util.List;  /**  *  */ public class ZookeeperClient {      //单机模式     private static String connectString = "192.168.40.130:2181";     //集群模式 //    private static String connectString = "zk101:2181,zk102:2181,zk103:2181";     private static int sessionTimeout = 30000;     private ZooKeeper zkClient = null;      /**      * 创建ZooKeeper客户端,完成服务端与客户端之间的连接      * connectString: 具体连接的那台zookeeper服务器的地址      * sessionTimeout: 客户端和服务端直接连接的超时时限      * watcher: 监听器相关设置      */     @Before     public void init2() throws IOException {         zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {             @Override             public void process(WatchedEvent watchedEvent) {              }         });     }      /**      * 创建节点相关信息      * String path: 要在哪个节点路径下创建新的节点      * byte[] data: 节点中存储的具体值      * List<ACL> acl: 权限控制相关内容, OPEN_ACL_UNSAFE表示任何人都可以访问      * CreateMode createMode: 创建节点的类型(永久、短暂等), PERSISTENT表示永久节点      */     @Test     public void create() throws KeeperException, InterruptedException {         String node = zkClient.create("/szh", "Java开发".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);         System.out.println(node);     } }

2.3 获取子结点并监听节点变化

package com.szh.zk;  import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test;  import java.io.IOException; import java.util.List;  /**  *  */ public class ZookeeperClient {      //单机模式     private static String connectString = "192.168.40.130:2181";     //集群模式 //    private static String connectString = "zk101:2181,zk102:2181,zk103:2181";     private static int sessionTimeout = 30000;     private ZooKeeper zkClient = null;      /**      * 创建ZooKeeper客户端,完成服务端与客户端之间的连接      * connectString: 具体连接的那台zookeeper服务器的地址      * sessionTimeout: 客户端和服务端直接连接的超时时限      * watcher: 监听器相关设置      */     @Before     public void init2() throws IOException {         zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {             @Override             public void process(WatchedEvent watchedEvent) {                 //收到事件通知后的回调函数(用户的业务逻辑)                 System.out.println("-----------------------------------");                 List<String> children = null;                 try {                     children = zkClient.getChildren("/", true);                 } catch (KeeperException | InterruptedException e) {                     e.printStackTrace();                 }                 children.forEach(System.out::println);             }         });     }      /**      * 获取子结点,并监听节点的变化      * String path: 监听哪个路径下的节点      * boolean watch: true表示采用监听器,客户端相关路径下的节点一旦发生变化,监听器就会触发相应的回调方法,再次启动监听      */     @Test     public void getChildren() throws KeeperException, InterruptedException {         List<String> children = zkClient.getChildren("/", true);         children.forEach(System.out::println);         Thread.sleep(Long.MAX_VALUE);     } }

基于上面创建节点的API测试之后,此时zk客户端中是存在 szh 这个节点的。

下面,我们到 zk客户端命令行中,创建新的节点,看看这里监听如何?

从上面的运行结果中可以看到,在客户端中实时的创建新的节点是可以被监听到的。

下面删除一个节点,这边监听仍然正常执行。

2.4 判断节点是否存在

package com.szh.zk;  import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test;  import java.io.IOException; import java.util.List;  /**  *  */ public class ZookeeperClient {      //单机模式     private static String connectString = "192.168.40.130:2181";     //集群模式 //    private static String connectString = "zk101:2181,zk102:2181,zk103:2181";     private static int sessionTimeout = 30000;     private ZooKeeper zkClient = null;      /**      * 创建ZooKeeper客户端,完成服务端与客户端之间的连接      * connectString: 具体连接的那台zookeeper服务器的地址      * sessionTimeout: 客户端和服务端直接连接的超时时限      * watcher: 监听器相关设置      */     @Test     public void init() throws IOException {         zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {             @Override             public void process(WatchedEvent watchedEvent) {              }         });     }      @Before     public void init2() throws IOException {         zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {             @Override             public void process(WatchedEvent watchedEvent) {              }         });     }      /**      * 判断节点是否存在      * String path: 判断哪个路径下的节点是否存在      * boolean watch: false表示不开启监听器功能      */     @Test     public void exist() throws KeeperException, InterruptedException {         Stat stat = zkClient.exists("/szh", false);         System.out.println(stat != null ? "节点存在" : "节点不存在");     } }

基于上面的API测试,我们知道 szh 这个节点是存在的,所以上面的运行结果是 节点存在。

然后将 szh 节点删除,自然就不存在了。


3.1 写流程之写入请求直接发送给Leader节点

3.2 写流程之写入请求直接发送给Follower节点