java进级篇:从源码理解线程池

章鱼老李 2018-09-07

java进级篇:从源码理解线程池


Executor接口

源码非常简单,只有一个execute(Runnable command)回调接口

public interface Executor {
 void execute(Runnable command);
}

执行已提交的 Runnable 任务对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用 Executor 而不是显式地创建线程。例如,可能会使用以下方法

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
 ...

不过,Executor 接口并没有严格地要求执行是异步的。在最简单的情况下,执行程序可以在调用方的线程中立即运行已提交的任务:

class DirectExecutor implements Executor {
 public void execute(Runnable r) {
 r.run();
 }
 }

更常见的是,任务是在某个不是调用方线程的线程中执行的。以下执行程序将为每个任务生成一个新线程。

class ThreadPerTaskExecutor implements Executor {
 public void execute(Runnable r) {
 new Thread(r).start();
 }
 }

许多 Executor 实现都对调度任务的方式和时间强加了某种限制。以下执行程序使任务提交与第二个执行程序保持连续,这展示了一个复合执行程序。

class SerialExecutor implements Executor {
 final Queue<Runnable> tasks = new LinkedBlockingQueue<Runnable>();
 final Executor executor;
 Runnable active;
 
 SerialExecutor(Executor executor) {
 this.executor = executor;
 }
 
 public synchronized void execute(final Runnable r) {
 tasks.offer(new Runnable() {
 public void run() {
 try {
 r.run();
 } finally {
 scheduleNext();
 }
 }
 });
 if (active == null) {
 scheduleNext();
 }
 }
 
 protected synchronized void scheduleNext() {
 if ((active = tasks.poll()) != null) {
 executor.execute(active);
 }
 }
 }

ExecutorService接口

该接口提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。先看下源码

public interface ExecutorService extends Executor {
 void shutdown();
 List<Runnable> shutdownNow();
 boolean isShutdown();
 boolean isTerminated();
 boolean awaitTermination(long timeout, TimeUnit unit)
 throws InterruptedException;
 <T> Future<T> submit(Callable<T> task);
 <T> Future<T> submit(Runnable task, T result);
 Future<?> submit(Runnable task);
 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
 throws InterruptedException;
 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
 long timeout, TimeUnit unit)
 throws InterruptedException;
 
 <T> T invokeAny(Collection<? extends Callable<T>> tasks)
 throws InterruptedException, ExecutionException;
 
 <T> T invokeAny(Collection<? extends Callable<T>> tasks,
 long timeout, TimeUnit unit)
 throws InterruptedException, ExecutionException, TimeoutException;
}

Executors类为创建ExecutorService提供了便捷的工厂方法。它只有一个直接实现类ThreadPoolExecutor和间接实现类ScheduledThreadPoolExecutor。

ExecutorService在Executor的基础上增加了一些方法,其中有两个核心的方法:

1、Future<?> submit(Runnable task)

2、<T> Future<T> submit(Callable<T> task)

通过创建并返回一个可用于取消执行和/或等待完成的 Future,方法submit扩展了基本方法

Executor.execute(java.lang.Runnable)。

下面对ExecutorService的函数进行一下简单介绍:

  • void shutdown():启动一个关闭命令,不再接受新任务,当所有已提交任务执行完后,就关闭。
  • List<Runnable> shutdownNow():试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。它无法保证能够停止正在处理的活动执行任务,但是会尽力尝试。例如,通过 Thread.interrupt() 来取消典型的实现,所以任何任务无法响应中断都可能永远无法终止。应该关闭未使用的 ExecutorService以允许回收其资源。
  • boolean isShutdown():如果此执行程序已关闭,则返回 true。
  • boolean isTerminated():如果关闭后所有任务都已完成,则返回 true。注意,除非首先调用 shutdown 或 shutdownNow,否则 isTerminated 永不为 true。
  • boolean awaitTermination(long timeout,TimeUnit unit) :如果此执行程序终止,则返回 true;如果终止前超时期满,则返回 false
  • <T> Future<T> submit(Callable<T> task):提交一个有返回值的任务用于执行,返回一个表示任务结果的 Future。该 Future 的 get 方法在成功完成时将会返回该任务的结果。
  • <T> Future<T> submit(Runnable task,T result):提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回给定的结果。
  • Future<?> submit(Runnable task):提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回 null。
  • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks):执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。返回列表的所有元素的 Future.isDone() 为 true。注意,该方法会一直阻塞直到所有任务完成。可以正常地或通过抛出异常来终止已完成任务。如果正在进行此操作时修改了给定的collection,则此方法的结果是不确定的。

ThreadPoolExecutor

ThreadPoolExecutor是ExecutorService的一个实现类,它使用可能的几个线程池之一执行每个提交的任务,通常使用 Executors 工厂方法配置。

线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法。

每个 ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。为了便于跨大量上下文使用,此类提供了很多可调整的参数和扩展钩子 (hook)。

对于核心的几个线程池,无论是 newFixedThreadPool()方法、 newSingleThreadExecutor()还是 newCachedThreadPool()方法, 虽然看起来创建的线程有着完全不同的功能特点, 但其内部实现均使用了 ThreadPoolExecutor实现。 下面给出了这三个线程池的实现方式:

public static ExecutorService newFixedThreadPool(int nThreads) {
 return new ThreadPoolExecutor(nThreads, nThreads,
 0L, TimeUnit.MILLISECONDS,
//使用一个基于FIFO排序的阻塞队列,在所有corePoolSize线程都忙时新任务将在队列中等待
 new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
 return new FinalizableDelegatedExecutorService
 (new ThreadPoolExecutor(1, 1,
 0L, TimeUnit.MILLISECONDS,
 new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
 60L, TimeUnit.SECONDS,
 new SynchronousQueue<Runnable>());
}

由以上线程池的实现代码可以看到, 它们都只是 ThreadPoolExecutor 类的封装 。 为何ThreadPoolExecutor有如此强大的功能呢? 来看一下 ThreadPoolExecutor最重要的构造函数:

public ThreadPoolExecutor(int corePoolSize,
 int maximumPoolSize,
 long keepAliveTime,
 TimeUnit unit,
 BlockingQueue<Runnable> workQueue,
 ThreadFactory threadFactory,
 RejectedExecutionHandler handler) //后两个参数为可选参数

参数说明:

  • corePoolSize:指定了线程池中的核心线程数
  • maximumPoolSize:最大可允许创建的线程数,corePoolSize和maximumPoolSize设置的边界自动调整池大小:corePoolSize=运行的线程数= maximumPoolSize:创建固定大小的线程池
  • keepAliveTime:如果线程数多于corePoolSize,则这些多余的线程的空闲时间超过keepAliveTime时将被终止
  • unit:keepAliveTime参数的时间单位
  • workQueue:保存任务的阻塞队列,被提交但尚未执行的任务,与线程池的大小有关:当运行的线程数少于corePoolSize时,在有新任务时直接创建新线程来执行任务而无需再进队列; 当运行的线程数等于或多于corePoolSize,在有新任务添加时则先加入队列,不直接创建线程; 当队列满时,在有新任务时就创建新线程
  • threadFactory:线程工厂,用于创建新线程,默认使用defaultThreadFactory创建线程
  • handle:定义处理被拒绝任务的策略,默认使用ThreadPoolExecutor.AbortPolicy,任务被拒绝时将抛出RejectExecutorException

ThreadPoolExecutor将根据corePoolSize和 maximumPoolSize设置的边界自动调整线程池大小。当新任务在方法 execute(java.lang.Runnable) 中提交时,如果运行的线程少于corePoolSize,则创建新线程来执行新任务,即使线程池中的其他线程是空闲的; 如果运行的线程多于corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程;如果设置的corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池;如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许线程池适应任意数量的并发任务。

在大多数情况下,核心和最大池大小仅基于构造来设置,不过也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。

java进级篇:从源码理解线程池

java进级篇:从源码理解线程池

相关推荐