JUC——线程同步辅助工具类(Semaphore,CountDownLatch,CyclicBarrier)

稀土 2018-05-11

锁的机制从整体的运行转态来讲核心就是:阻塞,解除阻塞,但是如果仅仅是这点功能,那么JUC并不能称为一个优秀的线程开发框架,然而是因为在juc里面提供了大量方便的同步工具辅助类。

Semaphore信号量

Semaphore通常用于限制可以访问某些资源(物理or逻辑)的线程数目。

例如,大家排队去银行办理业务,但是只有两个银行窗口提供服务,来了10个人需要办理业务,所以这10个排队的人员需要依次使用这两个业务窗口来办理业务。

观察Semaphore类的基本定义:

public class Semaphore extends Object implements Serializable

Semaphore类中定义的方法有如下几个:

  • 构造方法:
    public Semaphore(int premits),设置服务的信号量;
  • 构造方法:
    public Semaphore(int premits,boolean fair) ,是否为公平锁;
  • 等待执行:
    public void acquireUninterruptibly(int permits)
    • 设置的信号量上如果有阻塞的线程对象存在,那么将一直持续阻塞状态。
  • 释放线程的阻塞状态:
    public void release(int permits);
  • 返回可用的资源个数:
    public int availablePermits();

范例:实现银行排队业务办理

package so.strong.mall.concurrent;
import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreDemo {
    public static void main(String[] args) {
        final Semaphore semaphore = new Semaphore(2); //现在允许操作的资源一共有2个
        final Random random = new Random(); //模拟每一个用户办理业务的时间
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() { //每一个线程就是一个要办理业务的人员
                    if (semaphore.availablePermits() > 0) { //现有空余窗口
                        System.out.println("[" + Thread.currentThread().getName() + "]进入银行,没有人办理业务");
                    } else { //没有空余位置
                        System.out.println("[" + Thread.currentThread().getName() + "]排队等候办理业务");
                    }
                    try {
                        semaphore.acquire(); //从信号量中获得操作许可
                        System.out.println("[" + Thread.currentThread().getName() + "]开始办理业务");
                        TimeUnit.SECONDS.sleep(random.nextInt(10)); //模拟办公延迟
                        System.out.println("[" + Thread.currentThread().getName() + "]结束业务办理");
                        semaphore.release(); //当前线程离开办公窗口
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, "顾客-" + i).start();
        }
    }
}
[顾客-0]进入银行,没有人办理业务
[顾客-0]开始办理业务
[顾客-1]进入银行,没有人办理业务
[顾客-1]开始办理业务
[顾客-2]排队等候办理业务
[顾客-3]排队等候办理业务
[顾客-4]排队等候办理业务
[顾客-5]排队等候办理业务
[顾客-6]排队等候办理业务
[顾客-7]排队等候办理业务
[顾客-8]排队等候办理业务
[顾客-9]排队等候办理业务
[顾客-0]结束业务办理
[顾客-2]开始办理业务
[顾客-1]结束业务办理
[顾客-3]开始办理业务
[顾客-2]结束业务办理
[顾客-4]开始办理业务
[顾客-3]结束业务办理
[顾客-5]开始办理业务
[顾客-4]结束业务办理
[顾客-6]开始办理业务
[顾客-5]结束业务办理
[顾客-7]开始办理业务
[顾客-7]结束业务办理
[顾客-8]开始办理业务
[顾客-6]结束业务办理
[顾客-9]开始办理业务
[顾客-8]结束业务办理

这种信号量的处理在实际开发中有什么用呢?例如,现在对于数据库的连接一共有2个连接,那么可能有10个用户等待进行数据库操作,能够使用的连接个数为2个,这样10个用户就需要排队依次使用这两个连接来进行数据库操作。

CountDownLatch闭锁

CoundDownLatch描述的是一个计数的减少,下面首先观察一个程序的简单问题:

范例:编写一个简单的多线程开发

package so.strong.mall.concurrent;
public class CountDownDemo {
    public static void main(String[] args) {
        for (int i = 0; i < 2; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("["+Thread.currentThread().getName()+"]线程应用执行完毕");
                }
            },"线程对象-"+i).start();
        }
        System.out.println("[***主线程***]所有的程序执行完毕");
    }
}
[***主线程***]所有的程序执行完毕
[线程对象-1]线程应用执行完毕
[线程对象-0]线程应用执行完毕  

现在可以发现,对于此时应该保证所有的线程执行完毕后在执行程序的输出计算,就好比:旅游团集合人员乘车离开。应该保证所有的线程都执行完毕了(指定个数的线程),这样的话就必须做一个计数处理。

CoundDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。

CoundDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

JUC——线程同步辅助工具类(Semaphore,CountDownLatch,CyclicBarrier)

CoundDownLatch类之中的常用方法有如下几个:

  • 构造方法:
    public CountDownLatch(int count);  //要设置一个等待的线程个数;
  • 减少等待个数:

    public void countDown();  
  • 等待countDownLatch为0:

    public void await() throws InterruptedException;

范例:利用CountDownLatch解决之前的设计问题

package so.strong.mall.concurrent;
import java.util.concurrent.CountDownLatch;

public class CountDownDemo {
    public static void main(String[] args) throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(2); //2个线程全部执行完毕后可继续执行
        for (int i = 0; i < 2; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("[" + Thread.currentThread().getName() + "]线程应用执行完毕");
                    countDownLatch.countDown(); //减少等待的线程个数
                }
            }, "线程对象-" + i).start();
        }
        countDownLatch.await(); //等待计数的结束(个数为0)
        System.out.println("[***主线程***]所有的程序执行完毕");
    }
}
[线程对象-0]线程应用执行完毕
[线程对象-1]线程应用执行完毕
[***主线程***]所有的程序执行完毕

CyclicBarrier栅栏

CyclicBarrierCountDownLatch是非常类似的,CyclicBarrier核心的概念是在于设置一个等待线程的数量边界,到达了此边界之后进行执行。

CyclicBarrier类是一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点(Common Barrier Point)。

CyclicBarrier类是一种同步机制,它能够对处理一些算法的线程实现同。换句话讲,它就是一个所有线程必须等待的一个栅栏,直到所有线程都到达这里,然后所有线程才可以继续做其他事情。

通过调用CyclicBarrier对象的await()方法,两个线程可以实现互相等待。一旦N个线程在等待CyclicBarrier达成,所有线程将被释放掉去继续执行。

CyclicBarrier类的主要方法如下:

  • 构造方法:
    public CyclicBarrier(int parties);//设置等待的边界;
  • 傻傻的等待其他线程:
    public int await() throws InterruptedException, BrokenBarrierException;
  • 等待其他线程:
    public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException;
  • 重置等待的线程个数:
    public void reset();

范例:观察CyclicBarrier进行等待处理

package so.strong.mall.concurrent;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

public class CyclicBarrierDemo {
    public static void main(String[] args) throws Exception{
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); //当凑够2个线程金进行触发
        for (int i = 0; i < 3 ; i++) {
            final int second = i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("["+Thread.currentThread().getName()+"]-等待开始");
                    try {
                        TimeUnit.SECONDS.sleep(second);
                        cyclicBarrier.await(); //等待处理
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    System.out.println("["+Thread.currentThread().getName()+"]-等待结束");
                }
            },"娱乐者-"+i).start();
        }
    }
}
[娱乐者-0]-等待开始
[娱乐者-1]-等待开始
[娱乐者-2]-等待开始
[娱乐者-1]-等待结束
[娱乐者-0]-等待结束

如果不想一直等待则可以设置超时时间,则超过了等待时间之后将会出现"TimeoutException"。

package so.strong.mall.concurrent;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

public class CyclicBarrierDemo {
    public static void main(String[] args) throws Exception{
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); //当凑够2个线程金进行触发
        for (int i = 0; i < 3 ; i++) {
            final int second = i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("["+Thread.currentThread().getName()+"]-等待开始");
                    try {
                        TimeUnit.SECONDS.sleep(second);
                       <strong> cyclicBarrier.await(</strong><strong>6,TimeUnit.SECONDS); //等待处理</strong>
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    System.out.println("["+Thread.currentThread().getName()+"]-等待结束");
                }
            },"娱乐者-"+i).start();
        }
    }
}
[娱乐者-0]-等待开始
[娱乐者-1]-等待开始
[娱乐者-2]-等待开始
[娱乐者-1]-等待结束
[娱乐者-0]-等待结束
Disconnected from the target VM, address: '127.0.0.1:63717', transport: 'socket'
[娱乐者-2]-等待结束
java.util.concurrent.TimeoutException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:427)
    at so.strong.mall.concurrent.CyclicBarrierDemo$1.run(CyclicBarrierDemo.java:21)
    at java.lang.Thread.run(Thread.java:745)

CyclicBarrier还有一个特点是可以进行重置处理

范例:重置处理

package so.strong.mall.concurrent;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;<br /><br />
public class CyclicBarrierResetDemo {
    public static void main(String[] args) throws Exception {
        final CyclicBarrier cb = new CyclicBarrier(2); //当凑够2个线程就进行触发
        for (int i = 0; i < 3; i++) {
            final int second = i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("[" + Thread.currentThread().getName() + "]-等待开始");
                    try {
                        if (second == 2) {
                            cb.reset(); //重置
                            System.out.println("[重置处理****]" + Thread.currentThread().getName());
                        } else {
                            TimeUnit.SECONDS.sleep(second);
                            cb.await(6,TimeUnit.SECONDS);//等待处理
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    System.out.println("[" + Thread.currentThread().getName() + "]-等待结束");
                }
            }, "娱乐者-" + i).start();
        }
    }
}
[娱乐者-0]-等待开始
[娱乐者-1]-等待开始
[娱乐者-2]-等待开始
[重置处理****]娱乐者-2
[娱乐者-2]-等待结束
[娱乐者-1]-等待结束
[娱乐者-0]-等待结束

CountDownLatch与CyclicBarrier的区别

  • CountDownLatch最大的特征是进行一个数据减法的操作等待,所有的统计操作一旦开始之中就必须执行countDown()方法,如果等待个数不是0,就被一只等待,并且无法重置。
  • CyclicBarrier设置一个等待的临界点,并且可以有多个等待线程出现,只要满足了临界点就触发了线程的执行代码后将重新开始进行计数处理操作,也可以直接利用reset()方法执行重置操作。

相关推荐