java一些常用并发工具示例

稀土 2018-03-09

最近把《java并发编程实战》-Java Consurrency in Practice 重温了一遍,把书中提到的一些常用工具记录于此:

一、闭锁(门栓)-CountDownLatch

适用场景:多线程测试时,通常为了精确计时,要求所有线程都ready后,才开始执行,防止有线程先起跑,造成不公平,类似的,所有线程执行完,整个程序才算运行完成。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

/**

* 闭锁测试(菩提树下的杨过 http://yjmyzz.cnblogs.com/)

*

* @throws InterruptedException

*/

@Test

publicvoidcountdownLatch()throwsInterruptedException {

CountDownLatch startLatch =newCountDownLatch(1);//类似发令枪

CountDownLatch endLatch =newCountDownLatch(10);//这里的数量,要与线程数相同

for(inti =0; i <10; i++) {

Thread t =newThread(() -> {

try{

startLatch.await();//先等着,直到发令枪响,防止有线程先run

System.out.println(Thread.currentThread().getName() +" is running...");

Thread.sleep(10);

}catch(InterruptedException e) {

Thread.currentThread().interrupt();

}finally{

endLatch.countDown();//每个线程执行完成后,计数

}

});

t.setName("线程-"+ i);

t.start();

}

longstart = System.currentTimeMillis();

startLatch.countDown();//发令枪响,所有线程『开跑』

endLatch.await();//等所有线程都完成

longend = System.currentTimeMillis();

System.out.println("done! exec time => "+ (end - start) +" ms");

}

执行结果:

线程-1 is running...
线程-5 is running...
线程-8 is running...
线程-4 is running...
线程-3 is running...
线程-0 is running...
线程-2 is running...
线程-9 is running...
线程-7 is running...
线程-6 is running...
done! exec time => 13 ms

注:大家可以把第14行注释掉,再看看运行结果有什么不同。

二、信号量(Semaphore)

适用场景:用于资源数有限制的并发访问场景。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

publicclassBoundedHashSet<T> {

privatefinalSet<T> set;

privatefinalSemaphore semaphore;

publicBoundedHashSet(intbound) {

this.set = Collections.synchronizedSet(newHashSet<T>());

this.semaphore =newSemaphore(bound);

}

publicbooleanadd(T t)throwsInterruptedException {

if(!semaphore.tryAcquire(5, TimeUnit.SECONDS)) {

returnfalse;

}

;

booleanadded =false;

try{

added = set.add(t);

returnadded;

}finally{

if(!added) {

semaphore.release();

}

}

}

publicbooleanremove(Object o) {

booleanremoved = set.remove(o);

if(removed) {

semaphore.release();

}

returnremoved;

}

}

@Test

publicvoidsemaphoreTest()throwsInterruptedException {

BoundedHashSet<String> set =newBoundedHashSet<>(5);

for(inti =0; i <6; i++) {

if(set.add(i +"")) {

System.out.println(i +" added !");

}else{

System.out.println(i +" not add to Set!");

}

}

}

上面的示例将一个普通的Set变成了有界容器。执行结果如下:

0 added !
1 added !
2 added !
3 added !
4 added !
5 not add to Set!

三、栅栏CyclicBarrier

这个跟闭锁类似,可以通过代码设置一个『屏障』点,其它线程到达该点后才能继续,常用于约束其它线程都到达某一状态后,才允许做后面的事情。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

publicclassWorkerextendsThread {

privateCyclicBarrier cyclicBarrier;

publicWorker(CyclicBarrier cyclicBarrier) {

this.cyclicBarrier = cyclicBarrier;

}

privatevoidstep1() {

System.out.println(this.getName() +" step 1 ...");

}

privatevoidstep2() {

System.out.println(this.getName() +" step 2 ...");

}

publicvoidrun() {

step1();

try{

cyclicBarrier.await();

}catch(InterruptedException e) {

e.printStackTrace();

}catch(BrokenBarrierException e) {

e.printStackTrace();

}

step2();

}

}

@Test

publicvoidcyclicBarrierTest()throwsInterruptedException, BrokenBarrierException {

CyclicBarrier cyclicBarrier =newCyclicBarrier(11);

for(inti =0; i <10; i++) {

Worker w =newWorker(cyclicBarrier);

w.start();

}

cyclicBarrier.await();

}

这里我们假设有一个worder线程,里面有2步操作,要求所有线程完成step1后,才能继续step2.执行结果如下:

Thread-0 step 1 ...
Thread-1 step 1 ...
Thread-2 step 1 ...
Thread-3 step 1 ...
Thread-4 step 1 ...
Thread-5 step 1 ...
Thread-6 step 1 ...
Thread-7 step 1 ...
Thread-8 step 1 ...
Thread-9 step 1 ...
Thread-9 step 2 ...
Thread-0 step 2 ...
Thread-3 step 2 ...
Thread-4 step 2 ...
Thread-6 step 2 ...
Thread-2 step 2 ...
Thread-1 step 2 ...
Thread-8 step 2 ...
Thread-7 step 2 ...
Thread-5 step 2 ...

四、Exchanger

如果2个线程需要交换数据,Exchanger就能派上用场了,见下面的示例:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

@Test

publicvoidexchangerTest() {

Exchanger<String> exchanger =newExchanger<>();

Thread t1 =newThread(() -> {

String temp ="AAAAAA";

System.out.println("thread 1 交换前:"+ temp);

try{

temp = exchanger.exchange(temp);

}catch(InterruptedException e) {

e.printStackTrace();

}

System.out.println("thread 1 交换后:"+ temp);

});

Thread t2 =newThread(() -> {

String temp ="BBBBBB";

System.out.println("thread 2 交换前:"+ temp);

try{

temp = exchanger.exchange(temp);

}catch(InterruptedException e) {

e.printStackTrace();

}

System.out.println("thread 2 交换后:"+ temp);

});

t1.start();

t2.start();

}

执行结果:

thread 1 交换前:AAAAAA
thread 2 交换前:BBBBBB
thread 2 交换后:AAAAAA
thread 1 交换后:BBBBBB

五、FutureTask/Future

一些很耗时的操作,可以用Future转化成异步,不阻塞后续的处理,直到真正需要返回结果时调用get拿到结果

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

@Test

publicvoidfutureTaskTest()throwsExecutionException, InterruptedException, TimeoutException {

Callable<String> callable = () -> {

System.out.println("很耗时的操作处理中。。。");

Thread.sleep(5000);

return"done";

};

FutureTask<String> futureTask =newFutureTask<>(callable);

System.out.println("就绪。。。");

newThread(futureTask).start();

System.out.println("主线程其它处理。。。");

System.out.println(futureTask.get());

System.out.println("处理完成!");

System.out.println("-----------------");

System.out.println("executor 就绪。。。");

ExecutorService executorService = Executors.newSingleThreadExecutor();

Future<String> future = executorService.submit(callable);

System.out.println(future.get(10, TimeUnit.SECONDS));

}

执行结果:

就绪。。。
主线程其它处理。。。
很耗时的操作处理中。。。
done
处理完成!
-----------------
executor 就绪。。。
很耗时的操作处理中。。。
done

六、阻塞队列BlockingQueue

阻塞队列可以在线程间实现生产者-消费者模式。比如下面的示例:线程producer模拟快速生产数据,而线程consumer模拟慢速消费数据,当达到队列的上限时(即:生产者产生的数据,已经放不下了),队列就堵塞住了。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

@Test

publicvoidblockingQueueTest()throwsInterruptedException {

finalBlockingQueue<String> blockingDeque =newArrayBlockingQueue<>(5);

Thread producer =newThread() {

publicvoidrun() {

Random rnd =newRandom();

while(true) {

try{

inti = rnd.nextInt(10000);

blockingDeque.put(i +"");

System.out.println(this.getName() +" 产生了一个数字:"+ i);

Thread.sleep(rnd.nextInt(50));//模拟生产者快速生产

}catch(InterruptedException e) {

Thread.currentThread().interrupt();

}

}

}

};

producer.setName("producer 1");

Thread consumer =newThread() {

publicvoidrun() {

while(true) {

Random rnd =newRandom();

try{

String i = blockingDeque.take();

System.out.println(this.getName() +" 消费了一个数字:"+ i);

Thread.sleep(rnd.nextInt(10000));//消费者模拟慢速消费

}catch(InterruptedException e) {

Thread.currentThread().interrupt();

}

}

}

};

consumer.setName("consumer 1");

producer.start();

consumer.start();

while(true) {

Thread.sleep(100);

}

}

执行结果:

producer 1 产生了一个数字:6773
consumer 1 消费了一个数字:6773
producer 1 产生了一个数字:4456
producer 1 产生了一个数字:8572
producer 1 产生了一个数字:5764
producer 1 产生了一个数字:2874
producer 1 产生了一个数字:780 # 注意这里就已经堵住了,直到有消费者消费一条数据,才能继续生产
consumer 1 消费了一个数字:4456
producer 1 产生了一个数字:4193

相关推荐