源码学习之线程池

xzkjgw 2020-01-23

大家面试过程中肯定被问道过线程池。为什么要使用线程池呢?因为在系统中频繁创建线程会造成很大的CPU消耗。而且用完的线程要等待GC回收也会造成消耗。

下面我们就来学习下最常用的线程池 ThreadPoolExecutor,

首先先来看看它的构造方法:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

参数说明:

corePoolSize:核心线程数,线程池维持的正常活跃(保障不会线程超时)的最小线程数,不允许为0,除非设置允许线程超时等待(如果提交的任务数量大于这个参数时,提交的任务将被放入缓存队列)。
maximumPoolSize:最大线程数,即为线程池可以容纳的最大线程数。
keepAliveTime: 空闲的线程存活时间。当存在超过corePoolSize或设置了允许核心线程空闲超时的时候,线程将在等待这么长时间后自动销毁(默认以纳秒为单位)。
unit: 空闲的线程存活时间的单位。
workQueue: 等待队列,注意这个队列是阻塞的,当队列中没有任务时线程获取任务将被阻塞,直到有任务时候被唤醒。
threadFactory: 线程工厂,工厂模式大家都懂的,如果有需要自定义线程,实现ThreadFactory接口从newThread方法返回你自定义的线程类就好了。
handler: 拒绝策略处理器,在线程池无法接收新的工作时候会调用该处理器的拒绝方法拒绝掉一些任务。

线程池拒绝策略处理接口:RejectedExecutionHandler
实现类:
直接拒绝策略(不会报错):DiscardPolicy 
直接拒绝策略(将抛出一个RejectedExecutionException错误):AbortPolicy 
让放入的线程自己运行任务:CallerRunsPolicy
抛弃等待时间最长的任务:DiscardOldestPolicy

看完了这些参数后是不是还是一头雾水?哈哈,其实我也是这样。要了解线程池,先要了解它的状态。

private static final int COUNT_BITS = Integer.SIZE - 3;
    //CAPACITY 值为 1 << 29 = 0010 0000 0000 0000 0000 0000 0000 0000
    // (1 << 29) -1 = 0001 1111 1111 1111 1111 1111 1111 1111
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    //线程池状态
    // -1 << 29 = 1110 0000 0000 0000 0000 0000 0000 0000
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

线程池状态:
RUNNING:取30位到32位111代表运行中,可以接收新的任务并且会处理等待队列中的任务。
SHUTDOWN:取30位到32位000代表关闭中,不中断正在执行的任务,但是会中断等待中的线程。然后等待队列中剩余的任务处理完就关闭。
STOP: 取30位到32位001代表停止,不接受新任务也不处理等待队列中的任务。并且会中断正在运行的任务。
TIDYING: 清取30位到32位010代表理中,最后的扫尾工作,关闭中到结束的中间状态,所有任务已经完成,工作线程数workerCount为0,线程池将执行结束方法。
TERMINATED: 取30位到32位011代表结束,结束方法完成,线程池结束。
线程池状态转换:
RUNNING -> SHUTDOWN: 调用了shutdown()方法之后,线程池会开始拒绝接收新的任务,从运行中转变为关闭中的状态。一般是在线程池完成工作需要销毁时调用。
RUNNING -> STOP: 调用了shutdownNow()方法之后,线程池会拒绝接受任务并中断所以正在执行的任务,从运行状态转入停止状态。
SHUTDOWN -> STOP: 调用了shutdownNow()方法之后,线程池会拒绝接受任务并中断所以正在执行的任务,从关闭中转为停止状态。
SHUTDOWN -> TIDYING: 线程池本身处于停止中的状态,队列和线程池都为空的时候。线程进入清理状态。
STOP -> TIDYING: 当线程池为空时,线程池从停止中状态转入清理状态。
TIDYING -> TERMINATED: 当线程清理完成会回调 terminated() 方法,完成后线程正式结束。

现在不明白这些状态是干什么用的,可以先放到一边,接下来我们要看一下线程池的几个比较核心的方法。为了让代码更容易理解,我在上面加入了注释。

// 获取方法状态, 取计数器的30-32位作为线程池的状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //取计数器的后29位作为线程数
    private static int workerCountOf(int c)  { return c & CAPACITY; }


    //线程池执行方法
    public void execute(Runnable command) {
        if (command == null){
            throw new NullPointerException();
        }
        //获取计数器的数值
        int c = ctl.get();

        /*
         * 如果运行的线程数少于corePoolSize核心线程数,就启动一个新的线程来执行该任务。
         * addWorker方法将以原子方式检查runState(运行状态)和workerCount(工作中总线程数),
         * 如果运行状态为关闭或者工作中总线程数等于核心线程数返回false。
         */
        //workerCountOf 取计数器的后29位作为线程数 和核心线程数进行对比
        if (workerCountOf(c) < corePoolSize) {
            //启动一个新的线程来完成工作
            if (addWorker(command, true)){
                return;
            }
            //获取计数器的最新数值
            c = ctl.get();
        }
        /*
         * isRunning 方法是通过原子的计数器获取的数值来判断运行状态(注意如果是运行状态前三位一定是111整个数值是个负数),
         * 如果运行状态是运行状态,就尝试向等待队列末尾插入一个任务。
         */
        if (isRunning(c) && workQueue.offer(command)) {
            //获取计数器的最新数值
            int recheck = ctl.get();
            //再次判断线程池状态,如果这时候线程池进入其他状态,折删除刚刚添加的任务,并且在remove中调用了中断空闲线程的方法
            if (!isRunning(recheck) && remove(command)){
                //调用拒绝策略
                reject(command);
            }
            /*
             * 如果线程池还处于运行中 或 者线程处于关闭中,但是等待队列任务删除失败。
             * 判断工作线程的数量为0的话,就创建一个空闲线程放入池中,让他将队列中的任务执行完后
             * 线程池在转入清理状态
             */
            else if (workerCountOf(recheck) == 0){
                addWorker(null, false);
            }
        }
        /*
         * 如果运行状态为关闭再次尝试调用 addWorker 方法运行这个任务,这时addWorker 
         * 会返回false。这时候执行拒绝策略(reject 方法底层其实调用的拒绝策略的拒绝方法)。
         * 如果线程状态为运行中,但是因为等待队列满了,插入失败,再次调用addWorker 尝试运行
         * 任务,并且该方法参数 core为false若线程数未超过 最大线程数会创建一个新的线程来运行任务
         * addWorker 方法下面有介绍
         */
        else if (!addWorker(command, false)){
            reject(command);
        }
    }

    //将任务加入工作中的方法,线程池的核心方法之一
    //参数 core 代表是不是比较核心线程数的意思。
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            //获取计数器的数值
            int c = ctl.get();
            //取30位-32位判断线程状态
            int rs = runStateOf(c);
            /*
             * 如果线程池处于 停止,清理中,结束 等状态,直接返回false拒绝该任务
             * 如果线程处于 关闭中 且传入的任务不为空 也直接拒绝该任务。
             * 如果传入的任务为空,但是等待队列为空,证明没任务了不要创建新的线程,直接返回结束该方法。
             */
            if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())){
                return false;
            }
            for (;;) {
            
                //获取当前线程数
                int wc = workerCountOf(c);
                //判断线程数是否超过极限容量, 如果没超过 且core为true 在看看是否超过核心线程数
                // 如果 且core为false 则和最大线程数进行比较。 如果超过了就返回false拒绝该任务。
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)){
                    return false;
                }
                //修改原子计数器的线程数 如果修改成功 就没必要在无限循环了 直接断开循环跳到最外层不在执行循环
                if (compareAndIncrementWorkerCount(c)){
                    break retry;
                }
                //如果上面的修改没有成功 再次获取原子计数器数值
                c = ctl.get();  // Re-read ctl
                //判断当前的状态十分和之前的一样如果一样继续循环修改 如果不一样则结束循环跳到最外层大循环,从新开始
                if (runStateOf(c) != rs){
                    continue retry;
                }
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //获取主锁,为了保证在并发下添加任务的安全性
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    //获取到锁后 从新获取线程池运行状态
                    int rs = runStateOf(ctl.get());
                    //如果线程处于运行状态 或者 者线程处于关闭中,但是等待队列任务删除失败。且线程池中线程已全被中断
                    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                        //判断线程是否是活跃的
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //将新建的工作线程加入工作队列
                        workers.add(w);
                        //获取工作线程数量
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        //设置工作线程加入成功
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //如果工作线程加入成功启动新的线程 并将线程状态设置为 true
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

下面顺便提一句,为什么阿里规范不让用 Executors.newSingleThreadExecutor(); 这样的方法创建线程池。

我们可以看看他的源码:

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
LinkedBlockingQueue 这是一个阻塞无限队列,也就是说这个队列只要不是OOM了就可以一直往里面放。这样会造成如果线程数到达核心线程数以后还是处理不过来并不会继续创建线程,而是会一直往队列中塞任务,直到内存溢出。

相关推荐