KingfuL 2019-10-27
使用InterProcessMutex实现
InterProcessMutex基于Zookeeper实现了分布式的公平可重入互斥锁,类似于单个JVM进程内的ReentrantLock
1.初始化InterProcessMutex
private static InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");
2.获取锁
//获得了锁 public static boolean acquire(long time, TimeUnit unit){ try { return mutex.acquire(time,unit); } catch (Exception e) { e.printStackTrace(); return false; } }
3.释放锁
//释放锁 public static void release(){ try { mutex.release(); } catch (Exception e) { e.printStackTrace(); } }
测试:
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; /** * 基于curator的zookeeper分布式锁 * 这里我们开启5个线程,每个线程获取锁的最大等待时间为5秒,为了模拟具体业务场景,方法中设置4秒等待时间。 * */ public class CuratorUtil { private static String address = "192.168.12.101:2181"; public static void main(String[] args) { //1、重试策略:初试时间为1s 重试3次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); //2、通过工厂创建连接 CuratorFramework client = CuratorFrameworkFactory.newClient(address, retryPolicy); //3、开启连接 client.start(); //4 分布式锁 final InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock"); //读写锁 //InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, "/readwriter"); ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { fixedThreadPool.submit(new Runnable() { @Override public void run() { boolean flag = false; try { //尝试获取锁,最多等待5秒 flag = mutex.acquire(5, TimeUnit.SECONDS); Thread currentThread = Thread.currentThread(); if(flag){ System.out.println("线程"+currentThread.getId()+"获取锁成功"); }else{ System.out.println("线程"+currentThread.getId()+"获取锁失败"); } //模拟业务逻辑,延时4秒 Thread.sleep(4000); } catch (Exception e) { e.printStackTrace(); } finally{ if(flag){ try { mutex.release(); } catch (Exception e) { e.printStackTrace(); } } } } }); } } }
这里我们开启5个线程,每个线程获取锁的最大等待时间为5秒,为了模拟具体业务场景,方法中设置4秒等待时间。开始执行main方法,通过ZooInspector监控/curator/lock下的节点如下图:
对,没错,设置4秒的业务处理时长就是为了观察生成了几个顺序节点。果然如案例中所述,每个线程都会生成一个节点并且还是有序的。观察控制台,我们会发现只有两个线程获取锁成功,另外三个线程超时获取锁失败会自动删除节点。线程执行完毕我们刷新一下/curator/lock节点,发现刚才创建的五个子节点已经不存在了。