章鱼老李 2018-09-07
源码非常简单,只有一个execute(Runnable command)回调接口
public interface Executor { void execute(Runnable command); }
执行已提交的 Runnable 任务对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用 Executor 而不是显式地创建线程。例如,可能会使用以下方法
Executor executor = anExecutor; executor.execute(new RunnableTask1()); ...
不过,Executor 接口并没有严格地要求执行是异步的。在最简单的情况下,执行程序可以在调用方的线程中立即运行已提交的任务:
class DirectExecutor implements Executor { public void execute(Runnable r) { r.run(); } }
更常见的是,任务是在某个不是调用方线程的线程中执行的。以下执行程序将为每个任务生成一个新线程。
class ThreadPerTaskExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); } }
许多 Executor 实现都对调度任务的方式和时间强加了某种限制。以下执行程序使任务提交与第二个执行程序保持连续,这展示了一个复合执行程序。
class SerialExecutor implements Executor { final Queue<Runnable> tasks = new LinkedBlockingQueue<Runnable>(); final Executor executor; Runnable active; SerialExecutor(Executor executor) { this.executor = executor; } public synchronized void execute(final Runnable r) { tasks.offer(new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); } } }); if (active == null) { scheduleNext(); } } protected synchronized void scheduleNext() { if ((active = tasks.poll()) != null) { executor.execute(active); } } }
该接口提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。先看下源码
public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
Executors类为创建ExecutorService提供了便捷的工厂方法。它只有一个直接实现类ThreadPoolExecutor和间接实现类ScheduledThreadPoolExecutor。
ExecutorService在Executor的基础上增加了一些方法,其中有两个核心的方法:
1、Future<?> submit(Runnable task)
2、<T> Future<T> submit(Callable<T> task)
通过创建并返回一个可用于取消执行和/或等待完成的 Future,方法submit扩展了基本方法
Executor.execute(java.lang.Runnable)。
下面对ExecutorService的函数进行一下简单介绍:
ThreadPoolExecutor是ExecutorService的一个实现类,它使用可能的几个线程池之一执行每个提交的任务,通常使用 Executors 工厂方法配置。
线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法。
每个 ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。为了便于跨大量上下文使用,此类提供了很多可调整的参数和扩展钩子 (hook)。
对于核心的几个线程池,无论是 newFixedThreadPool()方法、 newSingleThreadExecutor()还是 newCachedThreadPool()方法, 虽然看起来创建的线程有着完全不同的功能特点, 但其内部实现均使用了 ThreadPoolExecutor实现。 下面给出了这三个线程池的实现方式:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, //使用一个基于FIFO排序的阻塞队列,在所有corePoolSize线程都忙时新任务将在队列中等待 new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
由以上线程池的实现代码可以看到, 它们都只是 ThreadPoolExecutor 类的封装 。 为何ThreadPoolExecutor有如此强大的功能呢? 来看一下 ThreadPoolExecutor最重要的构造函数:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) //后两个参数为可选参数
参数说明:
ThreadPoolExecutor将根据corePoolSize和 maximumPoolSize设置的边界自动调整线程池大小。当新任务在方法 execute(java.lang.Runnable) 中提交时,如果运行的线程少于corePoolSize,则创建新线程来执行新任务,即使线程池中的其他线程是空闲的; 如果运行的线程多于corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程;如果设置的corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池;如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许线程池适应任意数量的并发任务。
在大多数情况下,核心和最大池大小仅基于构造来设置,不过也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。