fuel 2017-07-26
源:http://blog.csdn.net/java2000_wl/article/details/8694270
评:
获取锁实现思路:
1.首先创建一个作为锁目录(znode),通常用它来描述锁定的实体,称为:/lock_node
2.希望获得锁的客户端在锁目录下创建znode,作为锁/lock_node的子节点,并且节点类型为有序临时节点(EPHEMERAL_SEQUENTIAL);
例如:有两个客户端创建znode,分别为/lock_node/lock-1和/lock_node/lock-2
3.当前客户端调用getChildren(/lock_node)得到锁目录所有子节点,不设置watch,接着获取小于自己(步骤2创建)的兄弟节点
4.步骤3中获取小于自己的节点不存在&&最小节点与步骤2中创建的相同,说明当前客户端顺序号最小,获得锁,结束。
5.客户端监视(watch)相对自己次小的有序临时节点状态
6.如果监视的次小节点状态发生变化,则跳转到步骤3,继续后续操作,直到退出锁竞争。
[java]viewplaincopy
print?
publicsynchronizedbooleanlock()throwsKeeperException,InterruptedException{
if(isClosed()){
returnfalse;
}
//如果锁目录不存在,创建锁目录节点类型为永久类型
ensurePathExists(dir);
//创建锁节点,节点类型EPHEMERAL_SEQUENTIAL
//如果不存在小于自己的节点并且最小节点与当前创建的节点相同获得锁
//未获得成功,对当前次小节点设置watch
return(Boolean)retryOperation(zop);
}
创建锁目录
[java]viewplaincopy
print?
protectedvoidensurePathExists(Stringpath){
ensureExists(path,null,acl,CreateMode.PERSISTENT);
}
[java]viewplaincopy
print?
protectedvoidensureExists(finalStringpath,finalbyte[]data,
finalList<ACL>acl,finalCreateModeflags){
try{
retryOperation(newZooKeeperOperation(){
publicbooleanexecute()throwsKeeperException,InterruptedException{
//创建锁目录
Statstat=zookeeper.exists(path,false);
//节点如果存在直接返回
if(stat!=null){
returntrue;
}
//创建节点
//data为null
//flags为持久化节点
zookeeper.create(path,data,acl,flags);
returntrue;
}
});
}catch(KeeperExceptione){
LOG.warn("Caught:"+e,e);
}catch(InterruptedExceptione){
LOG.warn("Caught:"+e,e);
}
}
创建锁节点,获得锁目录下的所有节点,如果为最小节点获得锁成功
[java]viewplaincopy
print?
/**
*thecommandthatisrunandretriedforactually
*obtainingthelock
*@returnifthecommandwassuccessfulornot
*/
publicbooleanexecute()throwsKeeperException,InterruptedException{
do{
if(id==null){
longsessionId=zookeeper.getSessionId();
Stringprefix="x-"+sessionId+"-";
//letstrylookupthecurrentIDifwefailed
//inthemiddleofcreatingtheznode
findPrefixInChildren(prefix,zookeeper,dir);
idName=newZNodeName(id);
}
if(id!=null){
List<String>names=zookeeper.getChildren(dir,false);
if(names.isEmpty()){
LOG.warn("Nochildrenin:"+dir+"whenwe'vejust"+
"createdone!Letsrecreateit...");
//letsforcetherecreationoftheid
id=null;
}else{
//letssortthemexplicitly(thoughtheydoseemtocomebackinorderususally
SortedSet<ZNodeName>sortedNames=newTreeSet<ZNodeName>();
for(Stringname:names){
sortedNames.add(newZNodeName(dir+"/"+name));
}
//获得最小节点
ownerId=sortedNames.first().getName();
//lock_1,lock_2,lock_3传入参数lock_2返回lock_1
SortedSet<ZNodeName>lessThanMe=sortedNames.headSet(idName);
if(!lessThanMe.isEmpty()){
ZNodeNamelastChildName=lessThanMe.last();
lastChildId=lastChildName.getName();
if(LOG.isDebugEnabled()){
LOG.debug("watchinglessthanmenode:"+lastChildId);
}
//次小节点设置watch
Statstat=zookeeper.exists(lastChildId,newLockWatcher());
if(stat!=null){
returnBoolean.FALSE;
}else{
LOG.warn("Couldnotfindthe"+
"statsforlessthanme:"+lastChildName.getName());
}
}else{
//锁目录下的最小节点与当前客户端创建相同
if(isOwner()){
if(callback!=null){
callback.lockAcquired();
}
//获得锁
returnBoolean.TRUE;
}
}
}
}
}
while(id==null);
returnBoolean.FALSE;
}
};
[java]viewplaincopy
print?
privatevoidfindPrefixInChildren(Stringprefix,ZooKeeperzookeeper,Stringdir)
throwsKeeperException,InterruptedException{
//获取锁目录下的所有子节点
List<String>names=zookeeper.getChildren(dir,false);
for(Stringname:names){
//x-sessionId-
if(name.startsWith(prefix)){
id=name;
if(LOG.isDebugEnabled()){
LOG.debug("Foundidcreatedlasttime:"+id);
}
break;
}
}
//当前锁目录下没有与当前会话对应的子节点创建子节点节点类型为临时顺序节点
if(id==null){
//dir/x-sessionId-i
id=zookeeper.create(dir+"/"+prefix,data,
getAcl(),EPHEMERAL_SEQUENTIAL);
if(LOG.isDebugEnabled()){
LOG.debug("Createdid:"+id);
}
}
释放锁:
释放锁非常简单,删除步骤1中创建的有序临时节点。另外,如果客户端进程死亡或连接失效,对应的节点也会被删除。
[java]viewplaincopy
print?
publicsynchronizedvoidunlock()throwsRuntimeException{
if(!isClosed()&&id!=null){
//wedon'tneedtoretrythisoperationinthecaseoffailure
//asZKwillremoveephemeralfilesandwedon'twannahang
//thisprocesswhenclosingifwecannotreconnecttoZK
try{
ZooKeeperOperationzopdel=newZooKeeperOperation(){
publicbooleanexecute()throwsKeeperException,
InterruptedException{
//删除节点忽略版本
zookeeper.delete(id,-1);
returnBoolean.TRUE;
}
};
zopdel.execute();
}catch(InterruptedExceptione){
LOG.warn("Caught:"+e,e);
//setthatwehavebeeninterrupted.
Thread.currentThread().interrupt();
}catch(KeeperException.NoNodeExceptione){
//donothing
}catch(KeeperExceptione){
LOG.warn("Caught:"+e,e);
throw(RuntimeException)newRuntimeException(e.getMessage()).
initCause(e);
}
finally{
if(callback!=null){
callback.lockReleased();
}
id=null;
}
}
}