keyxiaodj 2019-09-08
这篇文章对ThreadPoolExecutor创建的线程池如何操作线程的生命周期通过源码的方式进行详细解析。通过对execute方法、addWorker方法、Worker类、runWorker方法、getTask方法、processWorkerExit从源码角度详细阐述,文末有彩蛋。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); /** * workerCountOf方法取出低29位的值,表示当前活动的线程数; * 如果当前活动的线程数小于corePoolSize,则新建一个线程放入线程池中,并把该任务放到线程中 */ if (workerCountOf(c) < corePoolSize) { /** * addWorker中的第二个参数表示限制添加线程的数量 是根据据corePoolSize 来判断还是maximumPoolSize来判断; * 如果是ture,根据corePoolSize判断 * 如果是false,根据maximumPoolSize判断 */ if (addWorker(command, true)) return; /** * 如果添加失败,则重新获取ctl值 */ c = ctl.get(); } /** * 如果线程池是Running状态,并且任务添加到队列中 */ if (isRunning(c) && workQueue.offer(command)) { //double-check,重新获取ctl的值 int recheck = ctl.get(); /** * 再次判断线程池的状态,如果不是运行状态,由于之前已经把command添加到阻塞队列中,这时候需要从队列中移除command; * 通过handler使用拒绝策略对该任务进行处理,整个方法返回 */ if (!isRunning(recheck) && remove(command)) reject(command); /** * 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法; * 第一个参数为null,表示在线程池中创建一个线程,但不去启动 * 第二个参数为false,将线程池的线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断 */ else if (workerCountOf(recheck) == 0) addWorker(null, false); /** * 执行到这里,有两种情况: * 1、线程池的状态不是RUNNING; * 2、线程池状态是RUNNING,但是workerCount >= corePoolSize, workerQueue已满 * 这个时候,再次调用addWorker方法,第二个参数传false,将线程池的有限线程数量的上限设置为maximumPoolSize; * 如果失败则执行拒绝策略; */ } else if (!addWorker(command, false)) reject(command); }
简单来说,在执行execute()方法时如果状态一直是RUNNING时,的执行过程如下:
务;
加到该阻塞队列中;
maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新
提交的任务;
据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
这里要注意一下addWorker(null, false);,也就是创建一个线程,但并没有传入任务,因为
任务已经被添加到workQueue中了,所以worker在执行的时候,会直接从workQueue中
获取任务。所以,在workerCountOf(recheck) == 0时执行addWorker(null, false);也是
为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。
addWorker方法的主要作用是在线程池中创建一个新的线程并执行,firstTask参数用于指定新增的线程执行的第一个任务,core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize ,false表示新增线程前需要判断当前活动的线程数是否少于maximumPoolSize
private boolean addWorker(Runnable firstTask, boolean core) { retry: /** * 由于线程执行过程中,各种情况都有可能处于,通过自旋的方式来保证worker的增加; */ for (; ; ) { int c = ctl.get(); //获取线程池运行状态 int rs = runStateOf(c); /** * * 如果rs >= SHUTDOWN, 则表示此时不再接收新任务; * 接下来是三个条件 通过 && 连接,只要有一个任务不满足,就返回false; * 1.rs == SHUTDOWN,表示关闭状态,不再接收提交的任务,但却可以继续处理阻塞队列中已经保存的任务; * 2.fisrtTask为空 * 3.Check if queue empty only if necessary. */ if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; for (; ; ) { //获取线程池的线程数 int wc = workerCountOf(c); /** * 如果线程数 >= CAPACITY, 也就是ctl的低29位的最大值,则返回false; * 这里的core用来判断 限制线程数量的上限是corePoolSize还是maximumPoolSize; * 如果core是ture表示根据corePoolSize来比较; * 如果core是false表示根据maximumPoolSize来比较; */ if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; /** * 通过CAS原子的方式来增加线程数量; * 如果成功,则跳出第一个for循环; */ if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl //如果当前运行的状态不等于rs,说明线程池的状态已经改变了,则返回第一个for循环继续执行 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //根据firstTask来创建Worker对象 w = new Worker(firstTask); //每一个Worker对象都会创建一个线程 final Thread t = w.thread; if (t != null) { //创建可重入锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 获取线程池的状态 int rs = runStateOf(ctl.get()); /** * 线程池的状态小于SHUTDOWN,表示线程池处于RUNNING状态; * 如果rs是RUNNING状态或rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程; * 因为在SHUTDOWN状态时不会再添加新的任务,但还是处理workQueue中的任务; */ if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //workers是一个hashSet workers.add(w); int s = workers.size(); //largestPoolSize记录线程池中出现的最大的线程数量 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //启动线程,Worker实现了Running方法,此时会调用Worker的run方法 t.start(); workerStarted = true; } } } finally { if (!workerStarted) addWorkerFailed(w); } return workerStarted; }
线程池中的每一个对象被封装成一个Worker对象,ThreadPool维护的就是一组Worker对象。
Worker类继承了AQS,并实现了Runnable接口,其中包含了两个重要属性:firstTask用来保存传入的任务,thread是在调用构造方法是通过ThreadFactory来创建的线程,是用来处理任务的线程。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { /** * 把state设置为-1,,阻止中断直到调用runWorker方法; * 因为AQS默认state是0,如果刚创建一个Worker对象,还没有执行任务时,这时候不应该被中断 */ setState(-1); this.firstTask = firstTask; /** * 创建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程; * 所以一个Worker对象在启动的时候会调用Worker类中run方法 */ this.thread = getThreadFactory().newThread(this); } }
Worker类继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现?
可以看到tryAcquire方法,他是不允许重入的,而ReentrantLock是允许可重入的:
所以,Worker继承自AQS,用于判断线程是否空闲以及是否处于被中断。
protected boolean tryAcquire(int unused) { /** * cas修改state,不可重入; * state根据0来判断,所以Worker构造方法中讲state置为-1是为了禁止在执行任务前对线程进行中断; * 因此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为0 */ if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
在Worker类中的run方法调用了runWorker方法来执行任务
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); //获取第一个任务 Runnable task = w.firstTask; w.firstTask = null; //允许中断 w.unlock(); //是否因异常退出循环 boolean completedAbruptly = true; try { //如果task为空,则通过getTask来获取任务 while (task != null || (task = getTask()) != null) { w.lock(); /** * 如果线程池正在停止,那么要保证当前线程时中断状态; * 如果不是的话,则要保证当前线程不是中断状态 */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //beforeExecute和afterExecute是留给子类来实现的 beforeExecute(wt, task); Throwable thrown = null; try { //通过任务方式执行,不是线程方式 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //processWorkerExit会对completedAbruptly进行判断,表示在执行过程中是否出现异常 processWorkerExit(w, completedAbruptly); } }
总结一下runWorker方法的执行过程:
getTask方法用于从阻塞队列中获取任务
private Runnable getTask() { //timeout变量的值表示上次从阻塞队列中获取任务是否超时 boolean timedOut = false; for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); /** * 如果rs >= SHUTDOWN,表示线程池非RUNNING状态,需要再次判断: * 1、rs >= STOP ,线程池是否正在STOP * 2、阻塞队列是否为空 * 满足上述条件之一,则将workCount减一,并返回null; * 因为如果当前线程池的状态处于STOP及以上或队列为空,不能从阻塞队列中获取任务; */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); /** * timed变量用于判断是否需要进行超时控制; * allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时; * wc > corePoolSize,表示当前线程数大于核心线程数量; * 对于超过核心线程数量的这些线程,需要进行超时控制; */ boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /** * wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了 setMaximumPoolSize方法; * timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时; * 接下来判断,如果有效咸亨数量大于1,或者workQueue为空,那么将尝试workCount减1; * 如果减1失败,则返回重试; * 如果wc==1时,也就说明当前线程是线程池中的唯一线程了; */ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } /** * timed为trure,则通过workQueue的poll方法进行超时控制,如果在keepAliveTime时间内没有获取任务,则返回null; * 否则通过take方法,如果队列为空,则take方法会阻塞直到队列中不为空; */ try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; //如果r==null,说明已经超时了,timedOut = true; timedOut = true; } catch (InterruptedException retry) { //如果获取任务时当前线程发生了中断,则将timedOut = false; timedOut = false; } } }
注意:第二个if判断,目的是为了控制线程池的有效线程数量。
有上文分析得到,在execute方法时,如果当前线程池的线程数量超过coolPoolSize且小于maxmumPoolSize,并且阻塞队列已满时,则可以通过增加工作线程。但是如果工作线程在超时时间内没有获取到任务,timeOut=true,说明workQueue为空,也就说当前线程池不需要那么多线程来执行任务了,可以把多于的corePoolSize数量的线程销毁掉,保证线程数量在corePoolSize即可。
什么时候会销毁线程?
当然是runWorker方法执行完后,也就是Worker中的run方法执行完后,由JVM自动回收。
private void processWorkerExit(Worker w, boolean completedAbruptly) { /** * 如果completedAbruptly为true,则说明线程执行时出现异常,需要将workerCount数量减一 * 如果completedAbruptly为false,说明在getTask方法中已经对workerCount进行减一,这里不用再减 */ if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //统计完成的任务数 completedTaskCount += w.completedTasks; //从workers中移除,也就表示从线程池中移除一个工作线程 workers.remove(w); } finally { mainLock.unlock(); } //钩子函数,根据线程池的状态来判断是否结束线程池 tryTerminate(); int c = ctl.get(); /** * 当前线程是RUNNING或SHUTDOWN时,如果worker是异常结束,那么会直接addWorker; * 如果allowCoreThreadTimeOut=true,那么等待队列有任务,至少保留一个worker; * 如果allowCoreThreadTimeOut=false,workerCount少于coolPoolSize */ if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && !workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
至此,processWorkerExit执行完之后,工作线程被销毁。
工作线程的生命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程,runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入processWorkerExit,整个线程结束。
还没关注我的公众号?