憧憬 2017-03-24
zookeeper的两种分布式锁的源代码点评
自己实现锁的思想就是
所有分布式不好关的东西可以都注册到一个功能的中间件上,然后这个中间件进行统一汇集管理
对需要锁控制先后的线程先在执行前先建立一个标记性的节点,最后根据节点的顺序,决定线程执行的先后顺序(都在同一个zk上)
这也是zk的分布式锁原理
zk锁的源码(时序锁)
每个进程连接好zk之后就安照先后顺序创建节点,然后判断该节点是不是最先序列(最先创建),是的话就先执行,其他节点在其各自等待的节点(比自己小1的节点建立监听
,一旦自己监听的节点移除(锁释放),就会回调自己的process方法,自己多要做的事情(这时不在需要再获得锁,笫一次都依次建立好了时序锁))
public void connectZookeeper() throws Exception {
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {
public void process(WatchedEvent event) {
try {
// 连接建立时, 打开latch, 唤醒wait在该latch上的线程
if (event.getState() == KeeperState.SyncConnected) {
latch.countDown();////保证连接好了才往下走(倒计时为0开始执行)
}
// 发生了waitPath的删除事件
if (event.getType() == EventType.NodeDeleted && event.getPath().equals(waitPath)) {
doSomething();
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 等待连接建立
latch.await();////保证连接好了才往下走
采用回调的方式实现所有锁依次执行
// 在waitPath上注册监听器, 当waitPath被删除时, zookeeper会回调监听器的process方法
zk.getData(waitPath, true, new Stat());
当很多进程需要访问共享资源时,我们可以通过zk来实现分布式锁。主要步骤是:
1.建立一个节点,假如名为:lock 。节点类型为持久节点(PERSISTENT)
2.每当进程需要访问共享资源时,会调用分布式锁的lock()或tryLock()方法获得锁,这个时候会在第一步创建的lock节点下建立相应的顺序子节点,节点类型为临时顺序节点(EPHEMERAL_SEQUENTIAL),通过组成特定的名字name+lock+顺序号。
3.在建立子节点后,对lock下面的所有以name开头的子节点进行排序,判断刚刚建立的子节点顺序号是否是最小的节点,假如是最小节点,则获得该锁对资源进行访问。
4.假如不是该节点,就获得该节点的上一顺序节点,并给该节点是否存在注册监听事件。同时在这里阻塞。等待监听事件的发生,获得锁控制权。
5.当调用完共享资源后,调用unlock()方法,关闭zk,进而可以引发监听事件,释放该锁。
实现的分布式锁是严格的按照顺序访问的并发锁。
http://blog.csdn.net/a925907195/article/details/39639357
独占锁
都去创建一个同名锁,用好了后删除,之前为创建成功的开始有创建,未得到的等待(先判断自己是不是最小的),等待超时会包超时异常,但是会不断的迭代直到获得锁
private boolean waitForLock(String lower, long waitTime, TimeUnit unit,boolean isBreak) throws InterruptedException, KeeperException {
long beginTime = System.currentTimeMillis();
Stat stat = zk.exists(root + "/" + lower,true);
//判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
if(stat != null){
logger.debug("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);
this.latch = new CountDownLatch(1);
this.latch.await(waitTime, unit);
if(this.latch.getCount() == 1){
this.latch = null;
if(isBreak){
throw new LockException("申请锁资源默认时间是:"+defaultWaitTime+"ms,等待锁超时");
}else{
return false;
}
}
this.latch = null;
}
//取出所有lockName的锁
List<String> lockObjNodes = this.getChildrenListSort();
logger.debug(myZnode + "==" + lockObjNodes.get(0));
if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
//如果是最小的节点,则表示取得锁
return true;
}else{
//如果不是最小的节点,找到比自己小1的节点
String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
long endTime = System.currentTimeMillis();
waitTime = waitTime - (endTime - beginTime) ;
if(waitTime <= 0) return false;
return waitForLock(lower, waitTime, unit, false);//等待锁 (迭代)
}
}
http://blog.csdn.net/qq_18167901/article/details/50681429