少年阿涛 2020-01-12
线程池是池化技术的一种,对线程复用、资源回收、多任务执行有不错的实践。阅读源码,可以学习jdk的大师对于线程并发是怎么池化的,还有一些设计模式。同时,它也能给我们在使用它的时候多一种感知,出了什么问题可以马上意识到哪里的问题。
我们使用一个线程池,直接通过jdk提供的工具类直接创建。使用如下api创建一个固定线程数的线程池。
ExecutorService pool = Executors.newFixedThreadPool(5);
使用如下api创建一个会不断增长线程的线程池。
ExecutorService pool = Executors.newCachedThreadPool();
使用如下api创建定时任务线程池。
ExecutorService pool = Executors.newScheduledThreadPool(1);
使用如下api创建单线程的线程池。
ExecutorService pool = Executors.newSingleThreadExecutor();
执行线程任务的代码示例。
pool.execute(()->{ System.out.println(Thread.currentThread().getName()+" execute"); }); pool.shutdown();
或者使用Future模式。
Future<String> future = pool.submit(() -> { System.out.println(Thread.currentThread().getName()+" execute !"); return "SUCCESS"; }); String s = future.get(); System.out.println(s);
处理线程的异常,使用如下几种方式。重写ThreadFactory的未捕获异常的handler,处理异常。
ExecutorService pool = Executors.newFixedThreadPool(5,r -> { Thread t = new Thread(r); t.setUncaughtExceptionHandler((t1,e)->{ System.out.println("线程 :"+t1.getName()+",抛出异常:"+e.getMessage()); e.printStackTrace(); }); return t; });
继承线程池,重写afterHandler方法。
class ExecutorsExtend extends ThreadPoolExecutor{ public ExecutorsExtend(int nThreads) { super(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); System.out.println("after handler invoke!!"); try{ if(r instanceof Future<?>){ Object res = ((Future) r).get(); } }catch (Exception e){ e.printStackTrace(); } if(t != null){ t.printStackTrace(); } } }
使用future方式捕获异常。
Future future = pool.submit(()->{ System.out.println(Thread.currentThread().getName()+" executes !"); Object obj = null; System.out.println(obj.toString()); }); try{ future.get(); }catch (Exception e){ System.out.println("future 方式捕获异常"); e.printStackTrace(); }
线程池的内部机制是在控制线程数以及阻塞队列的关系实现的,它的核心参数:核心线程数(corePoolSize)、最大线程数(maximumPoolSize)、存活时间(keepAliveTime)、阻塞队列(workQueue)。
核心线程数,表示任务数只要少于这个值就一股脑儿地创建线程;最大线程数限制在阻塞队列无法存储任务的时候可以扩容的线城最大数目;存活时间就是一个线程能在空闲的时候存活多久的意思;阻塞队列,是存储任务的一个任务队列。
它的构造方法有很详细的说明。
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters and default thread factory and rejected execution handler. * It may be more convenient to use one of the {@link Executors} factory * methods instead of this general purpose constructor. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
我们着重看它的execute方法。
这里它执行每个关键步骤的时候都会去校验线程池的状态,是否关闭,线程数是否超过最大的限制等等。
第一个if这里判断的是,线程数是否达到核心线程数,如果没达到就会去创建一个新的线程接受任务。
第二个if这里判断的是,是否能放入阻塞队列,如果放入之后,其他已创建的线程执行完当前它的任务之后会去拉取阻塞队列中的任务继续执行;在成功放入阻塞队列中之后,还会继续校验线程池是否关闭,如果关闭则执行拒绝策略;接下来校验正在运行的线程数,是否为0,如果为0则增加一个没有当前任务的线程,去清空阻塞队列中的任务。
最后一个条件这里,是尝试添加一个非核心线程,携带的是当前提交的任务,这是在阻塞队列添加失败的情况下才会出现;如果最后这个条件失败的话,就会触发拒绝策略拒绝接受任务。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn‘t, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
所以我们得出一个简单的结论,在添加线程少于核心线程数的时候会一直创建线程接受任务,大于核心线程数之后,就会添加到阻塞队列中缓存任务,一旦阻塞队列满了,就会创建非核心线程接受任务;如果都满了,则会拒绝接受任务。
接下来我们看addWorker方法。它分为两部分,因为中间含有很多状态判断,所以变得比较难看懂。大体的意思是,先判断是否合规,合规之后创建线程并加入线程集合,最后跑线程。
private boolean addWorker(Runnable firstTask, boolean core) { // 这里主要判断是否达到核心线程数或最大线程数 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 这里主要是添加Worker,也就是新建线程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 在这里运行线程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
Worker这个对象是继承自AQS和线程任务接口Runnable的一个内部类。它执行任务的方法就是这个runWorker方法。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
/** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); }
这个runWorker方法,就是一个自旋跑任务的线程,跑完当前任务之后就会去getTask,从任务队列中获取任务。我们之前重写的afterExecute方法在这里可以看到了。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
退出线程的方法,这个方法会对缩容的时候有影响,如果阻塞队列还有任务它将缩小到1,如果没有就缩小到0。无论是否核心线程都会被释放掉。
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn‘t adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
这边有个小插曲,就是所有的状态它都定义在一个整型数的最高的三位。而线程池的最大线程数为Integer.SIZE-3,通过一堆位运算判断线程池的状态。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }