Zookeeper整合Java实战,不同客户端使用汇总

Java学习+面试指南:https://javaxiaobear.cn

ZooKeeper应用的开发主要通过Java客户端API去连接和操作ZooKeeper集群。可供选择的Java客户端API有:

  • ZooKeeper官方的Java客户端API。

  • 第三方的Java客户端API,比如Curator。

ZooKeeper官方的客户端API提供了基本的操作。例如,创建会话、创建节点、读取节点、更新数据、删除节点和检查节点是否存在等。不过,对于实际开发来说,ZooKeeper官方API有一些不足之处,具体如下:

  • ZooKeeper的Watcher监测是一次性的,每次触发之后都需要重新进行注册。

  • 会话超时之后没有实现重连机制。

  • 异常处理烦琐,ZooKeeper提供了很多异常,对于开发人员来说可能根本不知道应该如何处理这些抛出的异常。

  • 仅提供了简单的byte[]数组类型的接口,没有提供Java POJO级别的序列化数据处理接口。

  • 创建节点时如果抛出异常,需要自行检查节点是否存在。

  • 无法实现级联删除。

总之,ZooKeeper官方API功能比较简单,在实际开发过程中比较笨重,一般不推荐使用。

1、Zookeeper 原生Java客户端使用

1、搭建服务

1、创建一个maven项目,引入zookeeper client依赖

<!-- zookeeper client -->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.8.3</version>
</dependency>

注意:保持与服务端版本一致,不然会有很多兼容性的问题

2、学习API

ZooKeeper原生客户端主要使用org.apache.zookeeper.ZooKeeper这个类来使用ZooKeeper服务。

ZooKeeper常用构造器

ZooKeeper (connectString, sessionTimeout, watcher)
  • connectString:使用逗号分隔的列表,每个ZooKeeper节点是一个host.port对,host 是机器名或者IP地址,port是ZooKeeper节点对客户端提供服务的端口号。客户端会任意选取connectString 中的一个节点建立连接。

  • sessionTimeout : session timeout时间。

  • watcher:用于接收到来自ZooKeeper集群的事件。

使用 zookeeper 原生 API,连接zookeeper集群

3、代码连接客户端

public class ZkClientDemo {
    private static final  String  CONNECT_STR = "localhost:2181";
    /**
     * 连接地址
     */
    private final static String CLUSTER_CONNECT_STR="ip:2181";

    public static void main(String[] args) throws Exception {

        final CountDownLatch countDownLatch=new CountDownLatch(1);

        ZooKeeper zooKeeper = new ZooKeeper(CLUSTER_CONNECT_STR, 4000, watchedEvent -> {
            if(Watcher.Event.KeeperState.SyncConnected == watchedEvent.getState()){
                //如果收到了服务端的响应事件,连接成功
                countDownLatch.countDown();
                System.out.println("连接建立");
            }
        });
        System.out.println("连接中");
        countDownLatch.await();
        //CONNECTED
        System.out.println(zooKeeper.getState());

        Stat stat = zooKeeper.exists("/zk",false);
        if(null ==stat){
            //创建持久节点
            zooKeeper.create("/zk","javaxiaobear".getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }

        //持久监听
        zooKeeper.addWatch("/zk",new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println(event);
            }
        },AddWatchMode.PERSISTENT);


        Thread.sleep(Integer.MAX_VALUE);
    }
}

2、Zookeeper主要方法

  • create(path, data, acl,createMode): 创建一个给定路径的 znode,并在 znode 保存 data[]的 数据,createMode指定 znode 的类型。

  • delete(path, version):如果给定 path 上的 znode 的版本和给定的 version 匹配, 删除 znode。

  • exists(path, watch):判断给定 path 上的 znode 是否存在,并在 znode 设置一个 watch。

  • getData(path, watch):返回给定 path 上的 znode 数据,并在 znode 设置一个 watch。

  • setData(path, data, version):如果给定 path 上的 znode 的版本和给定的 version 匹配,设置 znode 数据。

  • getChildren(path, watch):返回给定 path 上的 znode 的孩子 znode 名字,并在 znode 设置一个 watch。

  • sync(path):把客户端 session 连接节点和 leader 节点进行同步。

方法特点:

  • 所有获取 znode 数据的 API 都可以设置一个 watch 用来监控 znode 的变化。

  • 所有更新 znode 数据的 API 都有两个版本: 无条件更新版本和条件更新版本。如果 version 为 -1,更新为无条件更新。否则只有给定的 version 和 znode 当前的 version 一样,才会进行更新,这样的更新是条件更新。

  • 所有的方法都有同步和异步两个版本。同步版本的方法发送请求给 ZooKeeper 并等待服务器的响 应。异步版本把请求放入客户端的请求队列,然后马上返回。异步版本通过 callback 来接受来 自服务端的响应。

1、连接Linux服务器

ZooKeeper 类通过其构造函数提供连接功能。构造函数的签名如下:

ZooKeeper(String connectionString, int sessionTimeout, Watcher watcher)
  • connectionString:ZooKeeper集群主机,可以为集群,也可为单个。
  • sessionTimeout:会话超时(以毫秒为单位)。
  • watcher:一个实现“Watcher”接口的对象。ZooKeeper集群通过Watcher对象返回连接状态。
  • 让我们创建ZooKeeperConnection类并添加一个方法connect。该连接方法创建一个ZooKeeper对象,所连接到的ZooKeeper集群,然后返回该对象。在这里,CountDownLatch用于停止(等待)主进程,直到客户端与ZooKeeper集群连接为止。 ZooKeeper集群通过Watcher回调返回连接状态。客户端与ZooKeeper集群连接后,将调用Watcher回调,并且Watcher回调调用CountDownLatch的countDown方法释放锁,并在主进程中等待。

代码实现如下:

public class ZkClientDemo {
    private static final  String  CONNECT_STR = "localhost:2181";
    /**
     * 连接地址
     */
    private final static String CLUSTER_CONNECT_STR="ip:2181";

    public static void main(String[] args) throws Exception {

        final CountDownLatch countDownLatch=new CountDownLatch(1);

        ZooKeeper zooKeeper = new ZooKeeper(CLUSTER_CONNECT_STR, 4000, watchedEvent -> {
            System.out.println(watchedEvent);
            if(Watcher.Event.KeeperState.SyncConnected == watchedEvent.getState()){
                //如果收到了服务端的响应事件,连接成功
                countDownLatch.countDown();
                System.out.println("连接建立");
            }
        });
        System.out.println("连接中");
        countDownLatch.await();
        //CONNECTED
        System.out.println(zooKeeper.getState());
    }
}

**注意:**如果连接,请检查一下是否开启了防火墙,服务器2181端口是否放开了

2、创建节点

create基础的方法用法如下

create(String path, byte[] data, List<ACL> acl, CreateMode createMode) 
  • path:Znode路径。例如:/javaxiaobear,/javaxiaobear/good
  • data:数据存储在指定的Znode点路径
  • acl:要创建的节点的访问控制列表。ZooKeeper API提供了一个静态接口ZooDefs.Ids,以获取一些基本的ACL列表。例如,ZooDefs.Ids.OPEN_ACL_UNSAFE返回打开的znode的acl列表。
  • createMode:节点的类型,临时的,序列的或两者兼而有之。这是一个枚举。

代码实现如下:

/**
 * 创建节点
 * @param parentPath 父节点路径
 * @param nodeName 节点名称
 * @param data 数据
 */
public void createNode(String parentPath, String nodeName, String data) throws Exception{
    zooKeeper.create(parentPath + "/" + nodeName ,data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
3、判断节点是否存在

ZooKeeper类提供了exists方法来检查znode的存在。如果指定的znode存在,它将返回znode的元数据。exists方法的用法如下:

exists(String path, boolean watcher)

参数:

  • path:节点路径
  • watcher:布尔值,指定是否要观看特定Z序节点或不看

代码实现如下:

/**
 * 判断某个节点是否存在
 * @param path 节点路径
 * @throws Exception 异常
 */
public Stat existsNode(String path) throws Exception{
    return zooKeeper.exists(path, true);
}
4、获取节点数据

ZooKeeper类提供了getData方法来获取附加到指定znode中的数据及其状态。getData方法的用法如下:

getData(String path, Watcher watcher, Stat stat)

参数:

  • path:Znode路径。
  • watcher:类型为Watcher的回调函数。当指定的znode的数据发生更改时,ZooKeeper集群将通过Watcher回调进行通知。这是一次性通知。
  • stat:返回Znode点的元数据。

代码实现如下:

/**
 * 获取节点数据
 * @param path 路径
 * @return
 * @throws Exception
 */
public String getData(String path) throws Exception{
    byte[] data = zooKeeper.getData(path, false, null);
    return new String(data);
}
5、修改节点

ZooKeeper类提供setData方法来修改附加到指定znode中的数据。setData方法的用法如下:

setData(String path, byte[] data, int version)

参数:

  • path:Znode路径
  • data:数据存储在指定的Znode点的路径。
  • version:znode的当前版本。每当更改数据时,ZooKeeper都会更新znode的版本号。

代码实现如下:

/**
 *  设置节点数据
 * @param path 路径
 * @param data 数据
 * @throws Exception
 */
public void setData(String path, String data) throws Exception{
    zooKeeper.setData(path, data.getBytes(), zooKeeper.exists(path, true).getVersion());
}
6、获取子节点数据

ZooKeeper类提供了getChildren方法来获取特定znode的所有子节点。getChildren方法的用法如下:

getChildren(String path, Watcher watcher)

参数:

  • path:Znode路径。
  • watcher:类型为Watcher的回调函数。当指定的znode的数据发生更改时,ZooKeeper集群将通过Watcher回调进行通知。这是一次性通知。

代码实现如下:

/**
     * 获取路径下的子节点
     * @param path 路径
     * @throws Exception
     */
    public void getChildren(String path) throws Exception{
        //首先判断节点是否存在
        Stat exists = zooKeeper.exists(path, true);
        if (null != exists){
            List<String> children = zooKeeper.getChildren(path, false);
            for (String child : children) {
                System.out.println(child);
            }
        }
    }
7、删除节点

ZooKeeper类提供了delete方法来删除指定的znode。delete方法的用法如下:

delete(String path, int version)

参数:

  • path:Znode路径。
  • version:znode的当前版本。

代码实现如下:

/**
 * 删除节点
 * @param path
 * @throws Exception
 */
public void deleteNode(String path) throws Exception{
    zooKeeper.delete(path, -1);
}
8、监听节点
/**
 * 监听节点
 * @param path 路径
 * @throws Exception
 */
public void addWatch(String path) throws Exception{
    zooKeeper.addWatch(path, new Watcher() {
        @Override
        public void process(WatchedEvent watchedEvent) {
            System.out.println(watchedEvent.getState());
        }
    }, AddWatchMode.PERSISTENT);
}
9、全部代码实现
public class ZkBaseOperations {
    private static final String ZOOKEEPER_ADDRESS = "ip:2181";
    private static final int SESSION_TIMEOUT = 3000;
    private ZooKeeper zooKeeper;
    public ZkBaseOperations() {
        try {
            CountDownLatch connectionLatch = new CountDownLatch(1);
            zooKeeper = new ZooKeeper(ZOOKEEPER_ADDRESS, SESSION_TIMEOUT, event -> {
                if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                    connectionLatch.countDown();
                    System.out.println("连接成功!");
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 关闭连接
     */
    public void close() {
        if (zooKeeper != null) {
            try {
                zooKeeper.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 创建节点
     * @param parentPath 父节点路径
     * @param nodeName 节点名称
     * @param data 数据
     */
    public void createNode(String parentPath, String nodeName, String data) throws Exception{
        zooKeeper.create(parentPath + "/" + nodeName ,data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    /**
     * 判断某个节点是否存在
     * @param path 节点路径
     * @throws Exception 异常
     */
    public Stat existsNode(String path) throws Exception{
        return zooKeeper.exists(path, true);
    }

    /**
     * 获取节点数据
     * @param path 路径
     * @return
     * @throws Exception
     */
    public String getData(String path) throws Exception{
        byte[] data = zooKeeper.getData(path, false, null);
        return new String(data);
    }

    /**
     *  设置节点数据
     * @param path 路径
     * @param data 数据
     * @throws Exception
     */
    public void setData(String path, String data) throws Exception{
        zooKeeper.setData(path, data.getBytes(), zooKeeper.exists(path, true).getVersion());
    }

    /**
     * 获取路径下的子节点
     * @param path 路径
     * @throws Exception
     */
    public void getChildren(String path) throws Exception{
        //首先判断节点是否存在
        Stat exists = zooKeeper.exists(path, true);
        if (null != exists){
            List<String> children = zooKeeper.getChildren(path, false);
            for (String child : children) {
                System.out.println(child);
            }
        }
    }

    /**
     * 删除节点
     * @param path
     * @throws Exception
     */
    public void deleteNode(String path) throws Exception{
        zooKeeper.delete(path, -1);
    }

    public static void main(String[] args) {
        ZkBaseOperations zookeeper = new ZkBaseOperations();
        try {
            //创建节点
            zookeeper.createNode("/javaxiaobear", "666","javaxiaobear is a good website");
            //判断阶段是否存在
            Stat stat = zookeeper.existsNode("/javaxiaobear/666");
            if(stat != null) {
                System.out.println("javaxiaobear Node exists and the node version is " +
                        stat.getVersion());
            } else {
                System.out.println("Node does not exists");
            }
            //获取节点数据
            String data = zookeeper.getData("/javaxiaobear/666");
            System.out.println("获取节点的数据为:" + data);
            //设置节点数据
            zookeeper.setData("/javaxiaobear", "666");
            //获取子节点
            zookeeper.getChildren("/javaxiaobear");
            //删除节点
            zookeeper.deleteNode("/javaxiaobear");
            zookeeper.close();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

2、zkClient开源客户端

ZkClient是Github上⼀个开源的zookeeper客户端,在Zookeeper原⽣API接⼝之上进⾏了包装,是⼀个更易⽤的Zookeeper客户端,同时,zkClient在内部还实现了诸如Session超时重连、Watcher反复注册等功能。

1、如何使用

**添加依赖:**在pom.xml⽂件中添加如下内容

<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.11</version>
</dependency>

2、创建会话

使⽤ZkClient可以轻松的创建会话,连接到服务端。

public class CreateSession {

    /**
     * 连接地址
     */
    private final static String CLUSTER_CONNECT_STR="ip:2181";
    public static void main(String[] args) {
        ZkClient zkClient = new ZkClient(CLUSTER_CONNECT_STR);
        System.out.println("zkClient 连接成功");
    }
}

运⾏结果:zkClient 连接成功。结果表明已经成功创建会话。

3、创建节点

ZkClient提供了递归创建节点的接⼝,即其帮助开发者先完成⽗节点的创建,再创建⼦节点

public class Create_Node_Sample {
    public static void main(String[] args) {
        ZkClient zkClient = new ZkClient("ip:2181");
 		//createParents的值设置为true,可以递归创建节点
		zkClient.createPersistent("/javaxiaobear",true);
 		System.out.println("创建节点成功");
    }
}

运⾏结果:success create znode.
结果表明已经成功创建了节点,值得注意的是,在原⽣态接⼝中是⽆法创建成功的(⽗节点不存在),但是通过ZkClient通过设置createParents参数为true可以递归的先创建⽗节点,再创建⼦节点

4、删除节点

ZkClient提供了递归删除节点的接⼝,即其帮助开发者先删除所有⼦节点(存在),再删除⽗节点。

public class DeleteNode {
    public static void main(String[] args) {
        ZkClient zkClient = new ZkClient("ip:2181");
		zkClient.deleteRecursive("/javaxiaobear");
 		System.out.println("删除节点成功");
    }
}

运⾏结果: 删除节点成功

结果表明ZkClient可直接删除带⼦节点的⽗节点,因为其底层先删除其所有⼦节点,然后再删除⽗节点

5、获取⼦节点

public class Get_Children_Sample {
    public static void main(String[] args) throws Exception {
        ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
        List<String> children = zkClient.getChildren("/javaxiaobear");
        System.out.println(children);
        //注册监听事件
        zkClient.subscribeChildChanges(path, new IZkChildListener() {
            public void handleChildChange(String parentPath, List<String>
                                          currentChilds) throws Exception {
                System.out.println(parentPath + " 's child changed,currentChilds:" + currentChilds);
            }
        });
        zkClient.createPersistent("/javaxiaobear");
        Thread.sleep(1000);
        zkClient.createPersistent("/javaxiaobear/c1");
        Thread.sleep(1000);
        zkClient.delete("/javaxiaobear/c1");
        Thread.sleep(1000);
        zkClient.delete(path);
        Thread.sleep(Integer.MAX_VALUE);
    }
}

运⾏结果:

/zk-book 's child changed, currentChilds:[]
/zk-book 's child changed, currentChilds:[c1]
/zk-book 's child changed, currentChilds:[]
/zk-book 's child changed, currentChilds:null

结果表明:客户端可以对⼀个不存在的节点进⾏⼦节点变更的监听。⼀旦客户端对⼀个节点注册了⼦节点列表变更监听之后,那么当该节点的⼦节点列表发⽣变更时,服务端都会通知客户端,并将最新的⼦节点列表发送给客户端,该节点本身的创建或删除也会通知到客户端。

6、获取数据(节点是否存在、更新、删除)

public class Get_Data_Sample {
    public static void main(String[] args) throws InterruptedException {
        String path = "/javaxiaobear-zk";
        ZkClient zkClient = new ZkClient("127.0.0.1:2181");
        //判断节点是否存在
        boolean exists = zkClient.exists(path);
        if (!exists){
            zkClient.createEphemeral(path, "123");
        }
        //注册监听
        zkClient.subscribeDataChanges(path, new IZkDataListener() {
            public void handleDataChange(String path, Object data) throws
                    Exception {
                System.out.println(path+"该节点内容被更新,更新后的内容"+data);
            }
            public void handleDataDeleted(String s) throws Exception {
                System.out.println(s+" 该节点被删除");
            }
        });
        //获取节点内容
        Object o = zkClient.readData(path);
        System.out.println(o);
        //更新
        zkClient.writeData(path,"4567");
        Thread.sleep(1000);
        //删除
        zkClient.delete(path);
        Thread.sleep(1000);
    }
}

运⾏结果:

123
/javaxiaobear-zk该节点内容被更新,更新后的内容4567
/javaxiaobear-zk 该节点被删除

3、Curator开源客户端使用

Curator是Netflix公司开源的一套ZooKeeper客户端框架,和ZkClient一样它解决了非常底层的细节开发工作,包括连接、重连、反复注册Watcher的问题以及NodeExistsException异常等。

Curator是Apache基金会的顶级项目之一,Curator具有更加完善的文档,另外还提供了一套易用性和可读性更强的Fluent风格的客户端API框架。

Curator还为ZooKeeper客户端框架提供了一些比较普遍的、开箱即用的、分布式开发用的解决方案,例如Recipe、共享锁服务、Master选举机制和分布式计算器等,帮助开发者避免了“重复造轮子”的无效开发工作。

Guava is to Java that Curator to ZooKeeper

在实际的开发场景中,使用Curator客户端就足以应付日常的ZooKeeper集群操作的需求。

官网:https://curator.apache.org/

1、如何使用

引入依赖

Curator 包含了几个包:

  • curator-framework是对ZooKeeper的底层API的一些封装。

  • curator-client提供了一些客户端的操作,例如重试策略等。

  • curator-recipes封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。

<!-- zookeeper client -->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.8.0</version>
</dependency>
 
<!--curator-->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.1.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
    </exclusions>
</dependency>

2、常用API

1、创建一个客户端实例

在使用curator-framework包操作ZooKeeper前,首先要创建一个客户端实例。这是一个CuratorFramework类型的对象,有两种方法:

  1. 使用工厂类CuratorFrameworkFactory的静态newClient()方法。

    public static CuratorFramework newClient(String connectString,RetryPolicy retryPolicy)
    public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
    
    • connectString:指的是需要连接的zookeeperAddress ip
    • retryPolicy:指的是连接zk时使用哪种重试策略
    • sessionTimeoutMs:指的是会话超时时间
    • connectionTimeoutMs:指的是连接超时时间

    代码实现如下:

    public class CuratorDemo {
        private final static String CLUSTER_CONNECT_STR="ip:2181";
    
        public static void main(String[] args) throws Exception {
            // 重试策略
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    //        //创建客户端实例
            CuratorFramework client = CuratorFrameworkFactory.newClient(CLUSTER_CONNECT_STR, retryPolicy);
            //启动客户端
            client.start();
            System.out.println("zookeeper初始化连接成功:" + client);
            //关闭连接
            client.close();
        }
    }
    
  2. 使用工厂类CuratorFrameworkFactory的静态builder构造者方法。

    public class CuratorDemo {
        private final static String CLUSTER_CONNECT_STR="ip:2181";
    
        public static void main(String[] args) throws Exception {
            //重试策略
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString(CLUSTER_CONNECT_STR)
                    .sessionTimeoutMs(5000)  // 会话超时时间
                    .connectionTimeoutMs(5000) // 连接超时时间
                    .retryPolicy(retryPolicy)
                    .namespace("base") // 包含隔离名称
                    .build();
            //启动客户端
            client.start();
            System.out.println("zookeeper初始化连接成功:" + client);
            //关闭连接
            client.close();
        }
    }
    
    • connectionString:服务器地址列表,在指定服务器地址列表的时候可以是一个地址,也可以是多个地址。如果是多个地址,那么每个服务器地址列表用逗号分隔, 如 host1:port1,host2:port2,host3;port3 。

    • retryPolicy:重试策略,当客户端异常退出或者与服务端失去连接的时候,可以通过设置客户端重新连接 ZooKeeper 服务端。而 Curator 提供了 一次重试、多次重试等不同种类的实现方式。在 Curator 内部,可以通过判断服务器返回的 keeperException 的状态代码来判断是否进行重试处理,如果返回的是 OK 表示一切操作都没有问题,而 SYSTEMERROR 表示系统或服务端错误。

    策略名称描述
    ExponentialBackoffRetry重试一组次数,重试之间的睡眠时间增加
    RetryNTimes重试最大次数
    RetryOneTime只重试一次
    RetryUntilElapsed在给定的时间结束之前重试
    • 超时时间:Curator 客户端创建过程中,有两个超时时间的设置。一个是 sessionTimeoutMs 会话超时时间,用来设置该条会话在 ZooKeeper 服务端的失效时间。另一个是 connectionTimeoutMs 客户端创建会话的超时时间,用来限制客户端发起一个会话连接到接收 ZooKeeper 服务端应答的时间。sessionTimeoutMs 作用在服务端,而 connectionTimeoutMs 作用在客户端。
2、创建节点

主要方法有以下:

public T forPath(String path) 
 //创建节点,并赋值内容
 public T forPath(String path, byte[] data)
 //判断节点是否存在,节点存在了,创建时仍然会报错
 public ExistsBuilder checkExists()

代码实现如下:

/**
 * 创建节点
 * @param nodePath 路径
 * @param data 数据
 */
public void createNode(String nodePath, String data) throws Exception{
    if (StringUtils.isEmpty(nodePath)) {
        System.out.println("节点【" + nodePath + "】不能为空");
    }
    //1、对节点是否存在进行判断,否则会报错:【NodeExistsException: KeeperErrorCode = NodeExists for /root】
    Stat exists = client.checkExists().forPath(nodePath);
    if (null != exists) {
        System.out.println("节点【" + nodePath + "】已存在,不能新增");
    }
    //2、创建节点 永久节点
    client.create().forPath(nodePath);
    // 2.1 手动指定节点的类型
    client.create().withMode(CreateMode.PERSISTENT).forPath(nodePath);
    //2.2 如果父节点不存在可创建当前的父节点
    String node = client.create().creatingParentsIfNeeded().forPath(nodePath);
    System.out.println(node);
    //创建节点,并为当前节点赋值内容
    if (!StringUtils.isBlank(data)) {
        //2.3、创建永久节点,并为当前节点赋值内容
        client.create()
                .forPath(nodePath, data.getBytes());
        //2.4、创建永久有序节点
        client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(nodePath, data.getBytes());
        //2.5、创建临时节点
        client.create()
                .withMode(CreateMode.EPHEMERAL)
                .forPath(nodePath, data.getBytes());
    }
    //2.6、创建临时有序节点
    client.create()
            .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
            .forPath(nodePath, data.getBytes());
}

在 Curator 中,可以使用 create 函数创建数据节点,并通过 withMode 函数指定节点类型(持久化节点,临时节点,顺序节点,临时顺序节点,持久化顺序节点等),默认是持久化节点,之后调用 forPath 函数来指定节点的路径和数据信息。

3、获取节点数据

主要的方法如下:

//获取某个节点数据
client.getData().forPath(nodePath)
//读取zookeeper的数据,并放到Stat中
client.getData().storingStatIn(stat1).forPath(nodePath)

参数说明:

path:指定要读取的节点

stat:指定数据节点的节点状态信息

4、设置修改节点

我们通过客户端实例的 setData() 方法更新 ZooKeeper 服务上的数据节点,在setData 方法的后边,通过 forPath 函数来指定更新的数据节点路径以及要更新的数据。

/**
 * 设置修改节点数据
 * @param nodePath 节点路径
 * @param data 数据
 */
public void setData(String nodePath, String data) throws Exception{
    //更新节点
    Stat stat = client.setData().forPath(nodePath, data.getBytes());
    // 指定版本号,更新节点,更新的时候如果指定数据版本的话,那么需要和zookeeper中当前数据的版本要一致,-1表示匹配任何版本
    Stat stat1 = client.setData().withVersion(-1).forPath(nodePath, data.getBytes());
    //异步设置某个节点数据
    Stat stat2 = client.setData().inBackground().forPath(nodePath, data.getBytes());
    System.out.println(stat1.toString());
}
5、获取某个节点的子节点
/**
 * 获取某个节点的子节点
 * @param nodePath
 * @throws Exception
 */
public void getChildren(String nodePath) throws Exception{
    List<String> list = client.getChildren().forPath(nodePath);
    for (String s : list) {
        System.out.println(s);
    }
}
6、删除节点
/**
 * 删除节点
 * @param nodePath 节点路径
 * @throws Exception
 */
public void delete(String nodePath) throws Exception{
    client.delete().forPath(nodePath);
    //删除节点,即使出现网络故障,zookeeper也可以保证删除该节点
    client.delete().guaranteed().forPath(nodePath);
    //级联删除节点(如果当前节点有子节点,子节点也可以一同删除)
    client.delete().deletingChildrenIfNeeded().forPath(nodePath);
}
  • guaranteed:该函数的功能如字面意思一样,主要起到一个保障删除成功的作用,其底层工作方式是:只要该客户端的会话有效,就会在后台持续发起删除请求,直到该数据节点在 ZooKeeper 服务端被删除。
  • deletingChildrenIfNeeded:指定了该函数后,系统在删除该数据节点的时候会以递归的方式直接删除其子节点,以及子节点的子节点。

3、功能接口实现

1、异步接口

Curator 引入了BackgroundCallback 接口,用来处理服务器端返回来的信息,这个处理过程是在异步线程中调用,默认在 EventThread 中调用,也可以自定义线程池。

public interface BackgroundCallback
{
    /**
     * Called when the async background operation completes
     *
     * @param client the client
     * @param event operation result details
     * @throws Exception errors
     */
    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
}

如上接口,主要参数为 client 客户端, 和 服务端事件 event ,inBackground 异步处理默认在EventThread中执行

制定线程池异步处理

/**
 * 指定线程池 获取节点数据
 * @param nodePath
 * @throws Exception
 */
public void testExecutorService(String nodePath) throws Exception{
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    byte[] data = client.getData()
            .inBackground((o1, o2) -> {
    }, executorService).forPath(nodePath);
    System.out.println(new String(data));
}
2、Curator 监听器

API方法如下:

/**
 * Receives notifications about errors and background events
 */
public interface CuratorListener
{
    /**
     * Called when a background task has completed or a watch has triggered
     *
     * @param client client
     * @param event the event
     * @throws Exception any errors
     */
    public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
}

针对 background 通知和错误通知。使用此监听器之后,调用inBackground 方法会异步获得监听

Curator Caches

Curator 引入了 Cache 来实现对 Zookeeper 服务端事件监听,Cache 事件监听可以理解为一个本地缓存视图与远程 Zookeeper 视图的对比过程。Cache 提供了反复注册的功能。Cache 分为两类注册类型:节点监听和子节点监听。

node cache:

NodeCache 对某一个节点进行监听

public NodeCache(CuratorFramework client, String path);

可以通过注册监听器来实现,对当前节点数据变化的处理

public void addListener(NodeCacheListener listener)

代码实现:

public class CuratorNodeCacheTest {
    private static final String ZOOKEEPER_ADDRESS = "ip:2181";
    private static final int SESSION_TIMEOUT = 3000;

    public CuratorFramework client;
    public CuratorNodeCacheTest() {
        // 重试策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.newClient(ZOOKEEPER_ADDRESS, retryPolicy);
        client.start();
    }

    /**
     *
     * @param nodePath 路径
     */
    public void nodeCache(String nodePath) throws Exception {
        NodeCache nodeCache = new NodeCache(client, nodePath);
        //监听当前节点路径
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                //查询节点数据
                byte[] data = client.getData().forPath(nodePath);
                System.out.println(new String(data));
            }
        });
        nodeCache.start();
    }
}

path cache:

PathChildrenCache 会对子节点进行监听,但是不会对二级子节点进行监听,

public PathChildrenCache(CuratorFramework client,
                         String path,
                         boolean cacheData)

可以通过注册监听器来实现,对当前节点的子节点数据变化的处理

public void addListener(PathChildrenCacheListener listener)
/**
 * 
 * @param nodePath 节点路径
 * @throws Exception
 */
public void pathCache(String nodePath) throws Exception{
    PathChildrenCache pathChildrenCache = new PathChildrenCache(client, nodePath, true);
    pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
        @Override
        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
            System.out.println(event);
        }
    });
    // 如果设置为true则在首次启动时就会缓存节点内容到Cache中
    pathChildrenCache.start(true);
}

tree cache:

TreeCache 使用一个内部类TreeNode来维护这个一个树结构。并将这个树结构与ZK节点进行了映射。所以TreeCache 可以监听当前节点下所有节点的事件。

public TreeCache(CuratorFramework client, String path, boolean cacheData)

可以通过注册监听器来实现,对当前节点的子节点,及递归子节点数据变化的处理

public void addListener(TreeCacheListener listener)
public void treeCache(String nodePath) throws Exception{
    TreeCache treeCache = new TreeCache(client, nodePath);
    treeCache.getListenable().addListener(new TreeCacheListener() {
        @Override
        public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
            System.out.println(event);
        }
    });
    treeCache.start();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/272847.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

uniapp中uview的text组件

基本使用&#xff1a; 通过text参数设置文本内容。推荐您使用:textvalue的形式 <u--text text"我用十年青春,赴你最后之约"></u--text>设置主题&#xff1a; 通过type参数设置文本主题&#xff0c;我们提供了五类属性。primary error success warning…

STM32 cubeMX 人体红外模块实验

本文代码使用HAL库。 文章目录 前言一、人体红外模块介绍工作原理&#xff1a; 二、人体红外原理图解读三、STM32 cubeMX配置红外模块四、代码编写总结 前言 实验开发板&#xff1a;STM32F051K8。所需软件&#xff1a;keil5 &#xff0c; cubeMX 。实验目的&#xff1a;了解 人…

【流复制环境PostgreSQL-14.1到PostgreSQL-16.1大版本升级】

PostgreSQL大版本会定期添加新特性&#xff0c;这些新特性通常会改变系统表的布局&#xff0c;但内部数据存储格式很少改变。pg_upgrade通过创建新的系统表和重用旧的用户数据文件来执行快速升级。 pg_upgrade升级主要有三种用法&#xff1a; 1、使用pg_upgrade拷贝升级。 2、…

Shell三剑客:awk(awk编辑编程)三

一、For 循环 For 循环的语法 for(variable addignment; condition; iteration peocess) {statement1statement2... } #for 语句首先执行初始化动作( initialisation )&#xff0c;然后再检查条件( condition )。如果条件为真&#xff0c;则执行动作( action )&#xff0c;然后…

【论文笔记】Run, Don’t Walk: Chasing Higher FLOPS for Faster Neural Networks

论文地址&#xff1a;Run, Dont Walk: Chasing Higher FLOPS for Faster Neural Networks 代码地址&#xff1a;https://github.com/jierunchen/fasternet 该论文主要提出了PConv&#xff0c;通过优化FLOPS提出了快速推理模型FasterNet。 在设计神经网络结构的时候&#xff…

网络编程--网络基础

这里写目录标题 协议的概念什么是协议典型协议 分层模型OSI七层模型与TCP/TP四层模型 通信过程协议格式以太网帧协议&#xff08;主要作用与mac地址&#xff0c;也就是网卡&#xff09;mac地址格式ARP协议总结 IP协议&#xff08;主要作用于IP&#xff09;UDP与TCP协议&#xf…

one wire(单总线)FPGA代码篇

一.引言 单总线&#xff08;OneWire&#xff09;是一种串行通信协议&#xff0c;它允许多个设备通过一个单一的数据线进行通信。这个协议通常用于低速、短距离的数字通信&#xff0c;特别适用于嵌入式系统和传感器网络。 二.one wire通信优点缺点 优点&#xff1a; 单一数据线…

扫描全能王启动鸿蒙原生应用开发,系HarmonyOS NEXT智能扫描领域首批

近期&#xff0c;“鸿蒙合作签约暨扫描全能王鸿蒙原生应用开发启动仪式”&#xff08;简称“签约仪式”&#xff09;正式举行。合合信息与华为达成鸿蒙合作&#xff0c;旗下扫描全能王将基于HarmonyOS NEXT正式启动鸿蒙原生应用开发。据悉&#xff0c;扫描全能王是鸿蒙在智能扫…

TG7050CKN,TG7050SKN ,TG7050CMN,TG7050SMN

爱普生推出的温补晶振型号&#xff1a;TG7050CKN&#xff0c;TG7050SKN &#xff0c;TG7050CMN&#xff0c;TG7050SMN频率范围为 10mhz ~ 54mhz 适用于广泛的频率需求。这几款的特点就是耐高温&#xff0c;温度可达105℃高温&#xff0c;而且都是高稳定性温补晶振&#xff0c;&…

【C++】开源:fast-cpp-csv-parser数据解析库配置使用

&#x1f60f;★,:.☆(&#xffe3;▽&#xffe3;)/$:.★ &#x1f60f; 这篇文章主要介绍fast-cpp-csv-parser数据解析库配置使用。 无专精则不能成&#xff0c;无涉猎则不能通。——梁启超 欢迎来到我的博客&#xff0c;一起学习&#xff0c;共同进步。 喜欢的朋友可以关注一…

钦丰科技(安徽)股份有限公司携卫生级阀门管件盛装亮相2024发酵展

钦丰科技(安徽)股份有限公司携卫生级阀门管件盛装亮相2024济南生物发酵展&#xff01; 展位号&#xff1a;2号馆A65展位 2024第12届国际生物发酵产品与技术装备展览会&#xff08;济南&#xff09;于3月5-7日在山东国际会展中心盛大召开&#xff0c;展会同期将举办30余场高质…

ubuntu22.04搭建RTSP服务器

大致命令如下&#xff1a; git clone --depth 1 gitgithub.com:ZLMediaKit/ZLMediaKit.git sudo apt-get install build-essential sudo apt-get install cmake #除了openssl,其他其实都可以不安装 sudo apt-get install libssl-dev sudo apt-get install libsdl-dev sudo apt…

医院信息化-6 大模型与医疗

之前写了一系列跟医疗信息化相关的内容&#xff0c;其中有提到人工智能&#xff0c;但是写的都是原先的一些AI算法基础上的医疗应用。现在大模型出现的涌现推理能力确实让人惊讶&#xff0c;并且出现可商用化的可能性&#xff0c;因此最近一年关于大模型在医疗的应用也开始出现…

使用ffmpeg实现视频旋转并保持清晰度不变

1 原始视频信息 通过ffmpeg -i命令查看视频基本信息 ffmpeg -i source.mp4 ffmpeg version 6.1-essentials_build-www.gyan.dev Copyright (c) 2000-2023 the FFmpeg developersbuilt with gcc 12.2.0 (Rev10, Built by MSYS2 project)configuration: --enable-gpl --enable-…

智能三维数据虚拟现实电子沙盘

一、概述 易图讯科技&#xff08;www.3dgis.top&#xff09;以大数据、云计算、虚拟现实、物联网、AI等先进技术为支撑&#xff0c;支持高清卫星影像、DEM高程数据、矢量数据、无人机倾斜摄像、BIM模型、点云、城市白模、等高线、标高点等数据融合和切换&#xff0c;智能三维数…

python作业题百度网盘,python作业答案怎么查

大家好&#xff0c;小编来为大家解答以下问题&#xff0c;python作业题百度网盘&#xff0c;python作业答案怎么查&#xff0c;今天让我们一起来看看吧&#xff01; 1 以下代码的输出结果为&#xff1a; alist [1, 2, 3, 4] print(alist.reverse()) print(alist) A.[4, 3, 2, …

根据DCT特征训练CNN

记录一次改代码的挣扎经历&#xff1a; 看了几篇关于DCT频域的深度模型文献&#xff0c;尤其是21年FcaNet&#xff1a;基于DCT 的attention model&#xff0c;咱就是说想试试将我模型的输入改为分组的DCT系数&#xff0c;然后就开始下面的波折了。 第一次尝试&#xf…

在Centos7中利用Shell脚本:实现MySQL的数据备份

目录 自动化备份MySQL 一.备份数据库脚本 1.创建备份目录 2.创建脚本文件 3.新建配置文件&#xff08;连接数据库的配置文件&#xff09; 4.给文件权限(mysql_backup.sh) ​编辑 5.执行命令 (mysql_backup.sh) ​编辑 二.数据库通过备份恢复 1.创建脚…

多维时序 | MATLAB实现SSA-BiLSTM麻雀算法优化双向长短期记忆神经网络多变量时间序列预测

多维时序 | MATLAB实现SSA-BiLSTM麻雀算法优化双向长短期记忆神经网络多变量时间序列预测 目录 多维时序 | MATLAB实现SSA-BiLSTM麻雀算法优化双向长短期记忆神经网络多变量时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.MATLAB实现SSA-BiLSTM麻雀算法优化…

k8s的二进制部署: 源码包部署

服务器IP软件包k8s--master0120.0.0.61kube-aplserver&#xff0c;kube-controer-manager&#xff0c;kube-scheduler&#xff0c;etcdk8s--master0220.0.0.62kube-controer-manager&#xff0c;kube-schedulernode节点0120.0.0.62kubelet&#xff0c;kube-proxy&#xff0c;et…
最新文章