五、Zookeeper基于API操作Node节点

ErixHao 2020-05-30

安装zookeeper :linux下安装Zookeeper 3.4.14

zookeeper 分为5个包:

  1. org.apache.zookeeper //客户端主要类文件

  2. org.apache.zookeeper.data //

  3. org.apache.zookeeper.server

  4. org.apache.zookeeper.server.quorum

  5. org.apache.zookeeper.server.upgrade

建立会话

引入依赖:

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
</dependency>
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class CreateSession implements Watcher {

    private  static CountDownLatch countDownLatch = new CountDownLatch(1);


    /*
      建立会话
     */
    public static void main(String[] args) throws IOException, InterruptedException {

     /*
        客户端可以通过创建一个zk实例来连接zk服务器
        new Zookeeper(connectString,sesssionTimeOut,Wather)
        connectString: 连接地址:IP:端口
        sesssionTimeOut:会话超时时间:单位毫秒
        Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
     */

        ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new CreateSession());
        System.out.println(zooKeeper.getState());

        // 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞
        countDownLatch.await();

        System.out.println("客户端与服务端会话真正建立了");

    }

    /*
        回调方法:处理来自服务器端的watcher通知
     */
    public void process(WatchedEvent watchedEvent) {
        // SyncConnected
        if(watchedEvent.getState() == Event.KeeperState.SyncConnected){

            //解除主程序在CountDownLatch上的等待阻塞
            System.out.println("process方法执行了...");
            countDownLatch.countDown();

        }


    }
}

注意:ZooKeeper客户端和服务端会话的建立是一个异步的过程,也就是说在程序中,构造方法会在处理完客户端初始化工作后立即返回,在大多数情况下,此时并没有真正建立好一个可用的会话,在会话的生命周期中处于“ CONNECTING的状态。当该会话真正创建完毕后 ZooKeeper服务端会向会话对应的客户端发送一个事件通知,以告知客户端,客户端只有在获取这个通知之后,才算真正建立了会话

创建节点

节点介绍:

/**
* path :节点创建的路径
* data[] :节点创建要保存的数据,是个byte类型的
* acl :节点创建的权限信息(4种类型)
*         ANYONE_ID_UNSAFE : 表示任何人
*         AUTH_IDS :此ID仅可用于设置ACL。它将被客户机验证的ID替换。
*         OPEN_ACL_UNSAFE :这是一个完全开放的ACL(常用)--> world:anyone
*         CREATOR_ALL_ACL :此ACL授予创建者身份验证ID的所有权限
* createMode :创建节点的类型(4种类型)
*        PERSISTENT:持久节点
*        PERSISTENT_SEQUENTIAL:持久顺序节点
*        EPHEMERAL:临时节点
*        EPHEMERAL_SEQUENTIAL:临时顺序节点
String node = zookeeper.create(path,data,acl,createMode);
*/

import org.apache.zookeeper.*;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class CreateNote implements Watcher {

    private  static CountDownLatch countDownLatch = new CountDownLatch(1);

    private static ZooKeeper zooKeeper;


    /*
      建立会话
     */
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

     /*
        客户端可以通过创建一个zk实例来连接zk服务器
        new Zookeeper(connectString,sesssionTimeOut,Wather)
        connectString: 连接地址:IP:端口
        sesssionTimeOut:会话超时时间:单位毫秒
        Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
     */

         zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new CreateNote());
        System.out.println(zooKeeper.getState());

        // 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞
        //countDownLatch.await();\
        Thread.sleep(Integer.MAX_VALUE);

    }



    /*
        回调方法:处理来自服务器端的watcher通知
     */
    public void process(WatchedEvent watchedEvent) {
        // SyncConnected
        if(watchedEvent.getState() == Event.KeeperState.SyncConnected){

            //解除主程序在CountDownLatch上的等待阻塞
            System.out.println("process方法执行了...");
            // 创建节点
            try {
                createNoteSync();
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }


    }


    /*
       创建节点的方法
   */
    private static void createNoteSync() throws KeeperException, InterruptedException {

        /**
         *  path        :节点创建的路径
         *  data[]      :节点创建要保存的数据,是个byte类型的
         *  acl         :节点创建的权限信息(4种类型)
         *                 ANYONE_ID_UNSAFE    : 表示任何人
         *                 AUTH_IDS    :此ID仅可用于设置ACL。它将被客户机验证的ID替换。
         *                 OPEN_ACL_UNSAFE    :这是一个完全开放的ACL(常用)--> world:anyone
         *                 CREATOR_ALL_ACL  :此ACL授予创建者身份验证ID的所有权限
         *  createMode    :创建节点的类型(4种类型)
         *                  PERSISTENT:持久节点
         *                    PERSISTENT_SEQUENTIAL:持久顺序节点
         *                  EPHEMERAL:临时节点
         *                  EPHEMERAL_SEQUENTIAL:临时顺序节点
         String node = zookeeper.create(path,data,acl,createMode);
         */

        // 持久节点
        String note_persistent = zooKeeper.create("/lg-persistent", "持久节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        // 临时节点
        String note_ephemeral = zooKeeper.create("/lg-ephemeral", "临时节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

        // 持久顺序节点
        String note_persistent_sequential = zooKeeper.create("/lg-persistent_sequential", "持久顺序节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);

        System.out.println("创建的持久节点" + note_persistent);
        System.out.println("创建的临时节点" + note_ephemeral);
        System.out.println("创建的持久顺序节点" + note_persistent_sequential);

    }
}

读取节点:

public class GetNoteData implements Watcher {

    private  static CountDownLatch countDownLatch = new CountDownLatch(1);

    private static ZooKeeper zooKeeper;


    /*
      建立会话
     */
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

     /*
        客户端可以通过创建一个zk实例来连接zk服务器
        new Zookeeper(connectString,sesssionTimeOut,Wather)
        connectString: 连接地址:IP:端口
        sesssionTimeOut:会话超时时间:单位毫秒
        Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
     */

         zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new GetNoteData());
        System.out.println(zooKeeper.getState());

        // 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞
        //countDownLatch.await();\
        Thread.sleep(Integer.MAX_VALUE);

    }



    /*
        回调方法:处理来自服务器端的watcher通知
     */
    public void process(WatchedEvent watchedEvent) {

        /*
            子节点列表发生改变时,服务器端会发生noteChildrenChanged事件通知
            要重新获取子节点列表,同时注意:通知是一次性的,需要反复注册监听
         */
        if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){

            List<String> children = null;
            try {
                children = zooKeeper.getChildren("/lg-persistent", true);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(children);

        }



        // SyncConnected
        if(watchedEvent.getState() == Event.KeeperState.SyncConnected){

            //解除主程序在CountDownLatch上的等待阻塞
            System.out.println("process方法执行了...");

            // 获取节点数据的方法
            try {
                getNoteData();

                // 获取节点的子节点列表方法
                getChildrens();
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }


        }


    }

    /*
        获取某个节点的内容
     */
    private void getNoteData() throws KeeperException, InterruptedException {

        /**
         * path    : 获取数据的路径
         * watch    : 是否开启监听
         * stat    : 节点状态信息
         *        null: 表示获取最新版本的数据
         *  zk.getData(path, watch, stat);
         */
        byte[] data = zooKeeper.getData("/lg-persistent", false, null);
        System.out.println(new String(data));


    }


    /*
        获取某个节点的子节点列表方法
     */
    public static void getChildrens() throws KeeperException, InterruptedException {

        /*
            path:路径
            watch:是否要启动监听,当子节点列表发生变化,会触发监听
            zooKeeper.getChildren(path, watch);
         */
        List<String> children = zooKeeper.getChildren("/lg-persistent", true);
        System.out.println(children);



    }



}

删除节点

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class DeleteNote implements Watcher {

    private  static CountDownLatch countDownLatch = new CountDownLatch(1);

    private static ZooKeeper zooKeeper;


    /*
      建立会话
     */
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

     /*
        客户端可以通过创建一个zk实例来连接zk服务器
        new Zookeeper(connectString,sesssionTimeOut,Wather)
        connectString: 连接地址:IP:端口
        sesssionTimeOut:会话超时时间:单位毫秒
        Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
     */

         zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new DeleteNote());
        System.out.println(zooKeeper.getState());

        // 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞
        //countDownLatch.await();\
        Thread.sleep(Integer.MAX_VALUE);

    }



    /*
        回调方法:处理来自服务器端的watcher通知
     */
    public void process(WatchedEvent watchedEvent) {
        // SyncConnected
        if(watchedEvent.getState() == Event.KeeperState.SyncConnected){

            //解除主程序在CountDownLatch上的等待阻塞
            System.out.println("process方法执行了...");
            // 删除节点
            try {
                deleteNoteSync();
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }


        }


    }

    /*
        删除节点的方法
     */
    private void deleteNoteSync() throws KeeperException, InterruptedException {

        /*
          zooKeeper.exists(path,watch) :判断节点是否存在
          zookeeper.delete(path,version) : 删除节点
      */

        Stat stat = zooKeeper.exists("/lg-persistent/c1", false);
        System.out.println(stat == null ? "该节点不存在":"该节点存在");

        if(stat != null){
            zooKeeper.delete("/lg-persistent/c1",-1);
        }

        Stat stat2 = zooKeeper.exists("/lg-persistent/c1", false);
        System.out.println(stat2 == null ? "该节点不存在":"该节点存在");



    }


}

更新节点

/*
        更新数据节点内容的方法
     */
    private void updateNoteSync() throws KeeperException, InterruptedException {

         /*
            path:路径
            data:要修改的内容 byte[]
            version:为-1,表示对最新版本的数据进行修改
            zooKeeper.setData(path, data,version);
         */


        byte[] data = zooKeeper.getData("/lg-persistent", false, null);
        System.out.println("修改前的值:" + new String(data));

        //修改/lg-persistent 的数据 stat: 状态信息对象
        Stat stat = zooKeeper.setData("/lg-persistent", "客户端修改了节点数据".getBytes(), -1);

        byte[] data2 = zooKeeper.getData("/lg-persistent", false, null);
        System.out.println("修改后的值:" + new String(data2));

    }

相关推荐