liben00 2013-05-07
废话少说直接上代码!
import java.net.InetAddress; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; public class ClusterTest { static ZooKeeper zk = null; static Map<String, Watcher> myWatchMap = new HashMap<String, Watcher>(); static String ip = null; public static void init() throws InterruptedException { for(int i = 1; i < 6; i++){ final int k = i; Thread t = new Thread(new Runnable() { @Override public void run() { // 创建一个与服务器的连接 try { ZooKeeper zk = new ZooKeeper( "localhost:2181,localhost:2182", 1000, null); String path = ip+k; if(zk.exists(path, false) == null){ zk.create(path, "testRootData".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } Thread.sleep(1000); zk.close(); } catch (Exception e) { // e.printStackTrace(); } } }); t.start(); t.setName("RISHENG-THREAD-"+i); } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { InetAddress addr = InetAddress.getLocalHost(); ip="/zookeeper/"+addr.getHostAddress(); // 创建一个与服务器的连接 zk = new ZooKeeper("localhost:2181,localhost:2182", 1000, null); List<String> nodeList = zk.getChildren("/zookeeper", false); System.err.println(nodeList); init(); regWatch(); while(true){ Thread.sleep(1000); init(); regWatch(); } } public static void regWatch() throws KeeperException, InterruptedException{ for(int i = 1; i < 6; i++){ String path = ip+i; Watcher watch = null; if(!myWatchMap.containsKey(path)){ watch = new MyWatch(zk, path); myWatchMap.put(path, watch); } zk.exists(path, watch); } } } class MyWatch implements Watcher{ private ZooKeeper zk = null; private String path = null; public MyWatch(ZooKeeper zk, String path){ this.zk = zk; this.path = path; } @Override public void process(WatchedEvent event) { if(EventType.NodeDeleted.equals(event.getType())){ System.err.println(event.getPath()+"====服务关闭===="+path); } else if(EventType.NodeCreated.equals(event.getType())){ System.out.println(event.getPath()+"====服务开启===="+path); } try { zk.exists(event.getPath(), this); } catch (KeeperException e) { } catch (InterruptedException e) { } } }
另外有几点需要关注:
1、如果重复的new新的Watcher会导致订阅端重复收到通知;
2、zk的Watcher是一次性消费,用完后记得重新注册;
3、zk不保证通知关系始终简历,所以需要轮询注册Watcher