最近把《java并发编程实战》-Java Consurrency in Practice 重温了一遍,把书中提到的一些常用工具记录于此:
一、闭锁(门栓)-CountDownLatch
适用场景:多线程测试时,通常为了精确计时,要求所有线程都ready后,才开始执行,防止有线程先起跑,造成不公平,类似的,所有线程执行完,整个程序才算运行完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | /**
* 闭锁测试(菩提树下的杨过 http://yjmyzz.cnblogs.com/)
*
* @throws InterruptedException
*/
@Test
publicvoidcountdownLatch()throwsInterruptedException {
CountDownLatch startLatch =newCountDownLatch(1);//类似发令枪
CountDownLatch endLatch =newCountDownLatch(10);//这里的数量,要与线程数相同
for(inti =0; i <10; i++) {
Thread t =newThread(() -> {
try{
startLatch.await();//先等着,直到发令枪响,防止有线程先run
System.out.println(Thread.currentThread().getName() +" is running...");
Thread.sleep(10);
}catch(InterruptedException e) {
Thread.currentThread().interrupt();
}finally{
endLatch.countDown();//每个线程执行完成后,计数
}
});
t.setName("线程-"+ i);
t.start();
}
longstart = System.currentTimeMillis();
startLatch.countDown();//发令枪响,所有线程『开跑』
endLatch.await();//等所有线程都完成
longend = System.currentTimeMillis();
System.out.println("done! exec time => "+ (end - start) +" ms");
}
|
执行结果:
线程-1 is running...
线程-5 is running...
线程-8 is running...
线程-4 is running...
线程-3 is running...
线程-0 is running...
线程-2 is running...
线程-9 is running...
线程-7 is running...
线程-6 is running...
done! exec time => 13 ms
注:大家可以把第14行注释掉,再看看运行结果有什么不同。
二、信号量(Semaphore)
适用场景:用于资源数有限制的并发访问场景。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | publicclassBoundedHashSet<T> {
privatefinalSet<T> set;
privatefinalSemaphore semaphore;
publicBoundedHashSet(intbound) {
this.set = Collections.synchronizedSet(newHashSet<T>());
this.semaphore =newSemaphore(bound);
}
publicbooleanadd(T t)throwsInterruptedException {
if(!semaphore.tryAcquire(5, TimeUnit.SECONDS)) {
returnfalse;
}
;
booleanadded =false;
try{
added = set.add(t);
returnadded;
}finally{
if(!added) {
semaphore.release();
}
}
}
publicbooleanremove(Object o) {
booleanremoved = set.remove(o);
if(removed) {
semaphore.release();
}
returnremoved;
}
}
@Test
publicvoidsemaphoreTest()throwsInterruptedException {
BoundedHashSet<String> set =newBoundedHashSet<>(5);
for(inti =0; i <6; i++) {
if(set.add(i +"")) {
System.out.println(i +" added !");
}else{
System.out.println(i +" not add to Set!");
}
}
}
|
上面的示例将一个普通的Set变成了有界容器。执行结果如下:
0 added !
1 added !
2 added !
3 added !
4 added !
5 not add to Set!
三、栅栏CyclicBarrier
这个跟闭锁类似,可以通过代码设置一个『屏障』点,其它线程到达该点后才能继续,常用于约束其它线程都到达某一状态后,才允许做后面的事情。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | publicclassWorkerextendsThread {
privateCyclicBarrier cyclicBarrier;
publicWorker(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
privatevoidstep1() {
System.out.println(this.getName() +" step 1 ...");
}
privatevoidstep2() {
System.out.println(this.getName() +" step 2 ...");
}
publicvoidrun() {
step1();
try{
cyclicBarrier.await();
}catch(InterruptedException e) {
e.printStackTrace();
}catch(BrokenBarrierException e) {
e.printStackTrace();
}
step2();
}
}
@Test
publicvoidcyclicBarrierTest()throwsInterruptedException, BrokenBarrierException {
CyclicBarrier cyclicBarrier =newCyclicBarrier(11);
for(inti =0; i <10; i++) {
Worker w =newWorker(cyclicBarrier);
w.start();
}
cyclicBarrier.await();
}
|
这里我们假设有一个worder线程,里面有2步操作,要求所有线程完成step1后,才能继续step2.执行结果如下:
Thread-0 step 1 ...
Thread-1 step 1 ...
Thread-2 step 1 ...
Thread-3 step 1 ...
Thread-4 step 1 ...
Thread-5 step 1 ...
Thread-6 step 1 ...
Thread-7 step 1 ...
Thread-8 step 1 ...
Thread-9 step 1 ...
Thread-9 step 2 ...
Thread-0 step 2 ...
Thread-3 step 2 ...
Thread-4 step 2 ...
Thread-6 step 2 ...
Thread-2 step 2 ...
Thread-1 step 2 ...
Thread-8 step 2 ...
Thread-7 step 2 ...
Thread-5 step 2 ...
四、Exchanger
如果2个线程需要交换数据,Exchanger就能派上用场了,见下面的示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | @Test
publicvoidexchangerTest() {
Exchanger<String> exchanger =newExchanger<>();
Thread t1 =newThread(() -> {
String temp ="AAAAAA";
System.out.println("thread 1 交换前:"+ temp);
try{
temp = exchanger.exchange(temp);
}catch(InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread 1 交换后:"+ temp);
});
Thread t2 =newThread(() -> {
String temp ="BBBBBB";
System.out.println("thread 2 交换前:"+ temp);
try{
temp = exchanger.exchange(temp);
}catch(InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread 2 交换后:"+ temp);
});
t1.start();
t2.start();
}
|
执行结果:
thread 1 交换前:AAAAAA
thread 2 交换前:BBBBBB
thread 2 交换后:AAAAAA
thread 1 交换后:BBBBBB
五、FutureTask/Future
一些很耗时的操作,可以用Future转化成异步,不阻塞后续的处理,直到真正需要返回结果时调用get拿到结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | @Test
publicvoidfutureTaskTest()throwsExecutionException, InterruptedException, TimeoutException {
Callable<String> callable = () -> {
System.out.println("很耗时的操作处理中。。。");
Thread.sleep(5000);
return"done";
};
FutureTask<String> futureTask =newFutureTask<>(callable);
System.out.println("就绪。。。");
newThread(futureTask).start();
System.out.println("主线程其它处理。。。");
System.out.println(futureTask.get());
System.out.println("处理完成!");
System.out.println("-----------------");
System.out.println("executor 就绪。。。");
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(callable);
System.out.println(future.get(10, TimeUnit.SECONDS));
}
|
执行结果:
就绪。。。
主线程其它处理。。。
很耗时的操作处理中。。。
done
处理完成!
-----------------
executor 就绪。。。
很耗时的操作处理中。。。
done
六、阻塞队列BlockingQueue
阻塞队列可以在线程间实现生产者-消费者模式。比如下面的示例:线程producer模拟快速生产数据,而线程consumer模拟慢速消费数据,当达到队列的上限时(即:生产者产生的数据,已经放不下了),队列就堵塞住了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | @Test
publicvoidblockingQueueTest()throwsInterruptedException {
finalBlockingQueue<String> blockingDeque =newArrayBlockingQueue<>(5);
Thread producer =newThread() {
publicvoidrun() {
Random rnd =newRandom();
while(true) {
try{
inti = rnd.nextInt(10000);
blockingDeque.put(i +"");
System.out.println(this.getName() +" 产生了一个数字:"+ i);
Thread.sleep(rnd.nextInt(50));//模拟生产者快速生产
}catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
};
producer.setName("producer 1");
Thread consumer =newThread() {
publicvoidrun() {
while(true) {
Random rnd =newRandom();
try{
String i = blockingDeque.take();
System.out.println(this.getName() +" 消费了一个数字:"+ i);
Thread.sleep(rnd.nextInt(10000));//消费者模拟慢速消费
}catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
};
consumer.setName("consumer 1");
producer.start();
consumer.start();
while(true) {
Thread.sleep(100);
}
}
|
执行结果:
producer 1 产生了一个数字:6773
consumer 1 消费了一个数字:6773
producer 1 产生了一个数字:4456
producer 1 产生了一个数字:8572
producer 1 产生了一个数字:5764
producer 1 产生了一个数字:2874
producer 1 产生了一个数字:780 # 注意这里就已经堵住了,直到有消费者消费一条数据,才能继续生产
consumer 1 消费了一个数字:4456
producer 1 产生了一个数字:4193