Zookeeper入门实战

iamdll 2020-03-27

Zookeeper是一个为分布式应用提供一致性协调服务的中间件,主要用来解决分布式应用中经常遇到的一些一致性问题,如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。本文主要包括Zookeeper简介、安装、命令行操作、java操作Zookeeper等,文中所使用到的软件版本:Java 1.8.0_191、Zookeeper 3.6.0、junit 4.13、Centos 7.6。

1、简介

1.1、设计目标

ZooKeeper is simple.
ZooKeeper is replicated.
ZooKeeper is ordered.
ZooKeeper is fast.

ZooKeeper是简单、可复制、有序、快速的。

1.2、数据模型和层次命名空间

ZooKeeper提供的命名空间与标准文件系统的命名空间非常类似。命名空间由一系列路径组成,用/分隔。ZooKeeper命名空间中的每个节点使用一个具体路径来标识。ZooKeeper的层次命名空间结构如下:

Zookeeper入门实战

1.3、节点

与标准文件系统不同的是,ZooKeeper命名空间的每个节点可以保存数据,就像一个文件系统中的文件,它既是文件也是目录。ZooKeeper用来存储状态信息、配置、位置信息等,因此存储在每个节点上的数据通常很小,在字节到千字节范围内。有四种类型的节点:

临时节点(EPHEMERAL):会话结束该节点自动被删除,临时节点不能拥有子节点
临时顺序节点(EPHEMERAL_SEQUENTIAL):具有临时节点特征,但是它会在节点名称后面增加一个序列号,分布式锁中会用到该类型节点
持久节点(PERSISTENT):创建后永久存在,可以自动删除;也可以设置一个存活时间,当指定存活时间过去以后,如果相应的节点没有得到更新且没有直接的,就会被自动删除
持久顺序节点(PERSISTENT_SEQUENTIAL):具有持久节点特征,但是它会在节点名称后面增加一个序列号

注:顺序节点中序列号对于此节点的父节点是唯一的,它是一个10位的数字,如果这个序列号大于2^32-1就会溢出。

1.4、更新和监视

客户端可以监视一个节点,当该节点发生变化时会,客户端会收到该节点变化的通知;一个监视器只会触发一次,触发后会删除该监视器。如果客户端和其中一个ZooKeeper服务器之间的连接中断,则客户端将收到一个本地通知。

1.5、状态信息

zxid:zookeeper每次状态改变都收到一个zxid(ZooKeeper Transaction Id),zxid是全局有序的,每次更新都会产生一个新的,且后面的大于前面的。
版本:每次节点改变都会使该节点的版本号增加,有三中版本号:dataversion(数据版本号)、cversion(子节点版本号)、aclversion(节点所拥有的ACL版本号)

通过stat [-w] path可以查看节点的具体状态信息:

cZxid 创建节点时的事务ID
ctime 创建节点时的时间
mZxid 最后修改节点的事务ID
mtime 最后修改节点的时间
pZxid 该节点的子节点最后一次修改的事务ID,添加子节点或删除子节点就会影响子节点列表,但是修改子节点的数据内容则不影响该ID
cversion 子节点版本号,子节点每次修改版本号加1
dataversion 数据版本号,数据每次修改该版本号加1
aclversion 权限版本号,权限每次修改该版本号加1
ephemeralOwner 节点的会话id,只有临时节点有,持久节点值为0
dataLength 节点的数据长度
numChildren 节点的子节点数量

1.6、特性

ZooKeeper的目标是作为构建其他复杂服务的基石,因此它提供了一系列的特性:

一致性:数据一致性, 数据按照顺序分批入库
原子性:事务要么成功要么失败
单一视图:客户端连接集群中的任意zk节点, 数据都是一致的
可靠性:每次对zk的操作状态都会保存在服务端
实时性::客户端可以读取到zk服务端的最新数据

2、zoo.cfg参数说明

clientPort zookeeper服务器对客户端暴露的端口
dataDir zookeeper服务器存储快照文件的目录,事务日志文件默认也保存在该目录下,除非另外指定。
dataLogDir 服务器存储事务日志文件的目录,默认与dataDir一致。建议将它和dataDir分别配置,防止磁盘的并发读写,影响服务器性能。可将其配置在一个单独的磁盘上。
tickTime 服务器最小时间单元,默认值3000ms
initLimit leader服务器等待Follewer服务器启动,并完成数据同步的时间,默认为10,表示10*tickTime
syncLimit leader服务器和Follewer服务器之间进行心跳检测的间隔时间,默认为5,表示5*tickTime
server.id zookeeper集群的机器列表,其中id为serverId,与myid文件中的值对应。第一个端口用于指定Leader服务器和Follewer服务器进行运行时通信和数据同步所使用的端口,第二个端口用于进行Leader选举过程中的投票通信

3、安装

3.1、单机版安装

3.1.1、下载并解压Zookeeper

下载地址:http://zookeeper.apache.org/releases.html

解压:tar zxvf zookeeper-3.6.0.tar.gz

3.1.2、修改配置文件

zoo.cfg默认不存在,可以从zoo_sample.cfg conf拷贝一份:

cd /home/hadoop/app/apache-zookeeper-3.6.0-bin/conf
cp zoo_sample.cfg zoo.cfg

配置文件zoo.cfg中的内容可以使用文件中的默认值,也可以根据实际需要修改配置项:

dataDir=/home/hadoop/app/apache-zookeeper-3.6.0-bin/data

3.1.3、启动停止

cd /home/hadoop/app/apache-zookeeper-3.6.0-bin/bin
zkServer.sh start  #启动
zkServer.sh stop  #停止

3.2、集群安装

假设在172.17.139.160、172.17.139.161、172.17.139.162三台机器上安装。

3.2.1、下载并解压Zookeeper(每台机器)

下载地址:http://zookeeper.apache.org/releases.html

解压:tar zxvf zookeeper-3.6.0.tar.gz

3.2.2、修改zoo.cfg配置文件(每台机器)

zoo.cfg默认不存在,可以从zoo_sample.cfg conf拷贝一份:

cd /home/hadoop/app/apache-zookeeper-3.6.0-bin/conf
cp zoo_sample.cfg zoo.cfg

zoo.cfg中集群与单机的配置不同的地方是server.id参数,其他根据实际需要修改配置项:

dataDir=/home/hadoop/app/apache-zookeeper-3.6.0-bin/data
server.1=172.17.139.160:2555:3555
server.2=172.17.139.161:2555:3555
server.3=172.17.139.162:2555:3555

3.2.3、创建myid文件(每台机器)

在dataDir(/home/hadoop/app/apache-zookeeper-3.6.0-bin/data)目录下创建myid文件,文件内容为该zookeeeper在集群中的id,对应上面zoo.cfg中server.后的数字。

172.17.139.160:/home/hadoop/app/apache-zookeeper-3.6.0-bin/data/myid 文件内容:

1

172.17.139.161:/home/hadoop/app/apache-zookeeper-3.6.0-bin/data/myid 文件内容:

2

172.17.139.162:/home/hadoop/app/apache-zookeeper-3.6.0-bin/data/myid 文件内容:

3

3.2.4、启动停止(每台机器)

cd /home/hadoop/app/apache-zookeeper-3.6.0-bin/bin
zkServer.sh start #启动
zkServer.sh stop  #停止

4、命令行

bin/zkCli.sh可以启动一个客户端连接到Zookeeper:

bin/zkCli.sh [-server host:port]

不加server参数,默认连接到本地2181端口;启动后可以输入help/h查看使用方法:

[zk: localhost:2181(CONNECTED) 4] help
ZooKeeper -server host:port cmd args
        addWatch [-m mode] path # optional mode is one of [PERSISTENT, PERSISTENT_RECURSIVE] - default is PERSISTENT_RECURSIVE
        addauth scheme auth
        close 
        config [-c] [-w] [-s]
        connect host:port
        create [-s] [-e] [-c] [-t ttl] path [data] [acl]
        delete [-v version] path
        deleteall path [-b batch size]
        delquota [-n|-b] path
        get [-s] [-w] path
        getAcl [-s] path
        getAllChildrenNumber path
        getEphemerals path
        history 
        listquota path
        ls [-s] [-w] [-R] path
        printwatches on|off
        quit 
        reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*]
        redo cmdno
        removewatches path [-c|-d|-a] [-l]
        set [-s] [-v version] path data
        setAcl [-s] [-v version] [-R] path acl
        setquota -n|-b val path
        stat [-w] path
        sync path
        version 
Command not found: Command not found help
[zk: localhost:2181(CONNECTED) 5]

4.1、列出子节点

ls [-s] [-w] [-R] path

-s:显示节点状态信息
-w:监听该节点
-R:递归查看所有子节点

如:ls /

4.2、创建节点

create [-s] [-e] [-c] [-t ttl] path [data] [acl]

-s:顺序节点
-e:临时节点
-t:设置存活时间(针对持久节点,单位秒);需要开启,默认是关闭的,参见第6小节:TTL(Time To Life)
acl:权限控制

如:create /test test

4.3、查看节点

get [-s] [-w] path

-s:显示状态
-w:监听该节点

如:get /test

4.4、设置节点

set [-s] [-v version] path data

-s:返回状态信息
-v:设置版本信息

如:set /test testaa

4.4、查看节点状态

stat [-w] path

-w:监视该节点

如:stat /test

4.5、删除节点

delete [-v version] path

-v:指定版本信息

如:delete /test

4.6、设置权限

setAcl [-s] [-v version] [-R] path acl

-s:返回状态信息
-v:指定版本信息
-R:递归设置权限

4.7、查看权限

getAcl [-s] path

-s:返回状态信息

5、权限控制ACL(Access Control List)

ZooKeeper的权限控制是基于每个znode节点的,需要对每个节点设置权限,子节点不会继承父节点的权限;ACL由三个字段组成:schema:id:permission。

5.1、schema(权限模式)

world 只有一个id,anyone,代表所有人
auth 使用已添加认证的用户认证
digest 使用“用户名:密码”方式认证
ip 使用IP地址认证
x509 使用客户端X500 Principal认证

5.2、id(授权对象)

权限赋予的用户或者一个实体

word对应的id只有一个:anyone
digest自定义id,通常为“usernmae:BASE64(SHA-1(username:password))”
ip对应的id为一个ip或ip段,如10.49.196.10、10.49.196.0、24

5.3、permission(权限)

CREATE(c) 可以创建子节点
READ(r) 可以读取节点数据及显示子节点列表
WRITE(w) 可以设置节点数据
DELETE(d) 可以删除子节点(仅下一级节点)
ADMIN(a) 可以设置节点权限

5.4、例子

5.4.1、word例子

setAcl /acltest world:anyone:cdrwa

创建节点时如果没有设置权限,这是默认的权限。

5.4.2、auth例子

addauth digest jack:123456 #先添加认证用户
setAcl /acltest auth:jack:cdrwa

再开一个终端需先添加认证用户(addauth digest jack:123456)才能访问/actltest

5.4.3、digest例子

echo -n jack:123456 | openssl dgst -binary -sha1 | openssl base64#得到密文tgi9UCnyPo5FJjVylKr05nAlWeg=
setAcl /acltest digest:jack:tgi9UCnyPo5FJjVylKr05nAlWeg=:cdrwa

添加认证用户(addauth digest jack:123456)后才能访问/actltest。

5.4.4、ip例子

setAcl /acltest ip:10.49.196.10:cdrwa

10.49.196.10的机器才能访问/actltest。

6、TTL(Time To Life)

 在zookeeper中,当创建一个PERSISTENT或者PERSISTENT_SEQUENTIAL节点的时候,可以有选择的给这个节点设置一个存活时间(TTL);当指定存活时间过去以后,如果该节点没有得到更新且没有直接的,就会被自动删除。

默认该特性是关闭的,如果需要设置java系统属性:zookeeper.extendedTypesEnabled;由于TTL节点是在3.5.3版本增加的,3.5.4/3.6.0版本并不支持,所以在3.5.4/3.6.0等其他版本还需设置另外一个java系统属性:Dzookeeper.emulate353TTLNodes。可以修改zkServer.sh脚本,增加:

-Dzookeeper.extendedTypesEnabled=true -Dzookeeper.emulate353TTLNodes=true

在zkServer.sh脚本里查找到start关键字,在如下图所示的地方增加上面的代码,如何重启Zookeeper即可。

Zookeeper入门实战

7、Java操作Zookeeper

7.1、原生API操作Zookeeper

7.1.1、引入依赖

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.6.0</version>
</dependency>

<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.13</version>
</dependency>

7.1.2、基本操作

package com.inspur.demo.general.zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;

/**
 * Zookeeper基本操作列子
 */
public class ZookeeperCase {
    //Zookeeper地址,集群多个地址间用逗号分隔,如:10.49.196.10:2181,10.49.196.11:2181,10.49.196.12:2181
    private static String connectString = "10.49.196.10:2181";
    private static int sessionTimeout = 2 * 1000;

    private ZooKeeper zooKeeper;

    @Before
    public void before() {
        try {
            zooKeeper =  new ZooKeeper(connectString, sessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {

                }
            });
            System.out.println(zooKeeper.getState());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @After
    public void after() throws Exception {
        zooKeeper.close();
    }

    /**
     * 创建节点
     */
    @Test
    public void create() throws Exception {
        /*
         * 同步创建持久节点,ACL为world:anyone:cdrwa
         * 等同于该命令:create /javatest/node1 test world:anyone:cdrwa
         */
        zooKeeper.create("/javatest/node1", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        /*
         * 同步创建持久节点,ACL为world:anyone:cr
         * 等同于该命令:create /javatest/node2 test world:anyone:cr
         */
        zooKeeper.create("/javatest/node2", "test".getBytes(), Collections.singletonList(new ACL((ZooDefs.Perms.CREATE + ZooDefs.Perms.READ), ZooDefs.Ids.ANYONE_ID_UNSAFE)), CreateMode.PERSISTENT);

        /*
         * 异步创建临时顺序节点,ACL为ip:127.0.0.1:c
         * 等同于该命令:create -s -e /javatest/node3 test ip:127.0.0.1:c
         */
        CountDownLatch counter = new CountDownLatch(1);
        zooKeeper.create("/javatest/node3", "test".getBytes(), Collections.singletonList(new ACL(ZooDefs.Perms.CREATE, new Id("ip", "127.0.0.1"))), CreateMode.EPHEMERAL_SEQUENTIAL
            ,new AsyncCallback.StringCallback() {
                @Override
                public void processResult(int rc, String path, Object ctx, String name) {
                    System.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx + ", name=" + name);
                    counter.countDown();
                }
            }, "上下文对象,异步回调时会传递给callback");
        counter.await();

        /*
         * 同步创建持久节点,ACL为digest:jack:tgi9UCnyPo5FJjVylKr05nAlWeg=:cdrwa
         * 等同于该命令:create /javatest/node4 test digest:jack:tgi9UCnyPo5FJjVylKr05nAlWeg=:cdrwa
         * 添加认证用户(addauth digest jack:123456)后才能访问/javatest/node4
         */
        zooKeeper.create("/javatest/node4", "test".getBytes(), Collections.singletonList(new ACL(ZooDefs.Perms.ALL, new Id("digest", "jack:tgi9UCnyPo5FJjVylKr05nAlWeg="))) , CreateMode.PERSISTENT);

        /*
         * 同步创建顺序持久节点,ACL为world:anyone:cdrwa,存活时间为5秒
         * 等同于该命令:create -s -t 5000 /javatest/node5 test
         */
        Stat stat = new Stat();
        zooKeeper.create("/javatest/node5", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL, stat, 5000);
        System.out.println(stat);
    }

    /**
     * 获取节点数据
     * @throws Exception
     */
    @Test
    public void getData() throws Exception {
        //同步读取数据
        Stat stat = new Stat();
        byte[] data = zooKeeper.getData("/javatest/node1", false, stat);
        System.out.println(new String(data));
        System.out.println(stat);

        //异步读取数据
        zooKeeper.addAuthInfo("digest", "jack:123456".getBytes());
        CountDownLatch counter = new CountDownLatch(1);
        zooKeeper.getData("/javatest/node4", false, new AsyncCallback.DataCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                String s = "";
                if (data != null) {
                    s = new String(data);
                }
                System.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx + ",data=" + s + ",stat=" + stat);
                counter.countDown();
            }
        }, "上下文对象,异步回调时会传递给callback");
        counter.await();
    }

    @Test
    public void setData() throws Exception {
        //同步设置数据,version为-1表示匹配任何版本
        Stat stat = zooKeeper.setData("/javatest/node1", "test2".getBytes(), -1);
        System.out.println(stat);

        //异步设置数据
        zooKeeper.addAuthInfo("digest", "jack:123456".getBytes());
        CountDownLatch counter = new CountDownLatch(1);
        zooKeeper.setData("/javatest/node4", "test2".getBytes(), -1, new AsyncCallback.StatCallback(){
            @Override
            public void processResult(int rc, String path, Object ctx, Stat stat) {
                System.out.println("rc=" + rc + ",path=" + path + ",stat=" + stat);
                counter.countDown();
            }
        }, "上下文对象,异步回调时会传递给callback");
        counter.await();
    }

    @Test
    public void delete() throws Exception {
        //同步删除数据
        zooKeeper.delete("/javatest/node1", -1);

        //异步删除数据
        CountDownLatch counter = new CountDownLatch(1);
        zooKeeper.delete("/javatest/node2", -1,  new AsyncCallback.VoidCallback(){
            @Override
            public void processResult(int rc, String path, Object ctx) {
                System.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx);
                counter.countDown();
            }
        }, "上下文对象,异步回调时会传递给callback");
        counter.await();
    }
}

7.1.3、监控节点

DataMonitor类实现对节点的监控,节点有变化时会回调DataMonitorListener.process方法,该方法由调用方根据业务来实现;WatcherCase类传入需要的参数来启动DataMonitor。

该例子是根据官网例子改造而来,相较官网更简单了些。

package com.inspur.demo.general.zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

public class DataMonitor implements Runnable {
    private ZooKeeper zk;
    private DataMonitorListener listener;

    /**
     * 节点变化时会回调该方法,把监控变化类型及新数据带过来
     */
    public interface DataMonitorListener {
        void process(WatchedEvent event, byte[] data);
    }

    public DataMonitor(String hostPort, String znode, DataMonitorListener listener) throws Exception {
        this.listener = listener;

        AsyncCallback.StatCallback callback = new AsyncCallback.StatCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, Stat stat) {
                System.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx + ",stat=" + stat);
                switch (rc) {
                    case KeeperException.Code.Ok:
                    case KeeperException.Code.NoNode:
                        return;
                    case KeeperException.Code.SessionExpired:
                    case KeeperException.Code.NoAuth:
                        close();
                        return;
                    default:
                        zk.exists(znode, true, this, null);
                        return;
                }
            }
        };
        //监视器
        Watcher watcher = new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println(event);
                if (event.getType() == Event.EventType.None) {
                    switch (event.getState()) {
                        case SyncConnected:
                            break;
                        case Expired:
                            close();
                            break;
                    }
                } else {
                    try {
                        byte[] bytes = zk.getData(event.getPath(), false, null);
                        listener.process(event, bytes);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                    if (event.getPath() != null && event.getPath().equals(znode)) {
                        //再次监控
                        zk.exists(znode, true, callback, null);
                    }
                }
            }
        };

        zk = new ZooKeeper("10.49.196.10:2181", 20000, watcher);
        zk.exists(znode, true, callback, null);
    }

    @Override
    public void run() {
        try {
            synchronized (this) {
                wait();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void close() {
        synchronized (this) {
            notifyAll();
        }
    }
}
package com.inspur.demo.general.zookeeper;

import org.apache.zookeeper.*;

/**
 * 监视节点样例
 */
public class WatcherCase {
    public static void main(String[] args) throws Exception {
        DataMonitor.DataMonitorListener listener = new DataMonitor.DataMonitorListener() {
            @Override
            public void process(WatchedEvent event, byte[] data) {
                //todo:根据实际情况处理
                if (event.getType() == Watcher.Event.EventType.NodeDataChanged) {
                    System.out.println(new String(data));
                }
            }
        };
        new DataMonitor("10.49.196.10:2181", "/watchtest", listener).run();
    }
}

相关推荐