因为代码(1)处的逻辑不利于理解,我们通过(1)的等价实现来理解:
if (rs>=SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
//等价实现
rs>=SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
其含义为,满足下列条件之一则直接返回false,线程创建失败:
- rs > SHUTDOWN,也就是STOP,TIDYING或TERMINATED,此时不再接受新的任务,且中断正在执行的任务
- rs = SHUTDOWN且firstTask != null,此时不再接受任务,但是仍会处理任务缓存队列中的任务
- rs = SHUTDOWN,队列为空
多说一句,若线程池处于 SHUTDOWN, firstTask 为 null,且 workQueue 非空,那么还得创建线程继续处理任务缓存队列中的任务。
总结一下,addWorker()方法完成了如下几件任务:
- 原子性的增加workerCount
- 将用户给定的任务封装成为一个worker,并将此worker添加进workers集合中
- 启动worker对应的线程
- 若线程启动失败,回滚worker的创建动作,即从workers中移除新添加的worker,并原子性的减少workerCount
3.工作线程的实现
从addWorker()方法的实现可以看出,工作线程的创建和启动都跟ThreadPoolExecutor中的内部类Worker有关。下面我们分析Worker类来看一下工作线程的实现。
Worker类继承自AQS类,具有锁的功能;实现了Runable接口,可以将自身作为一个任务在线程中执行。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
Worker的主要字段就下面三个,代码也比较简单。
//用来封装worker的线程,线程池中真正运行的线程,通过线程工厂创建而来
final Thread thread;
//worker所对应的第一个任务,可能为空
Runnable firstTask;
//记录当前线程完成的任务数
volatile long completedTasks;
Worker的构造函数如下。
Worker(Runnable firstTask) {
//设置AQS的state为-1,在执行runWorker()方法之前阻止线程中断
setState(-1);
//初始化第一个任务
this.firstTask = firstTask;
//利用指定的线程工厂创建一个线程,注意,参数是Worker实例本身this
//也就是当执行start方法启动线程thread时,真正执行的是Worker类的run方法
this.thread = getThreadFactory().newThread(this);
}
Worker类继承了AQS类,重写了其相应的方法,实现了一个自定义的同步器,实现了不可重入锁。
Worker类还提供了一个中断线程thread的方法。
void interruptIfStarted() {
Thread t;
//AQS状态大于等于0,worker对应的线程不为null,且该线程没有被中断
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
再来看一下Worker类的run()方法的实现,会发现run()方法最终调用了ThreadPoolExecutor类的runWorker()方法。
public void run() {
runWorker(this);
}
4.线程复用机制
通过上文可以知道,worker中的线程start 后,执行的是worker的run()方法,而run()方法最终会调用ThreadPoolExecutor类的runWorker()方法,runWorker()方法实现了线程池中的线程复用机制。下面我们来看一下runWorker()方法的实现。
runWorker()方法是线程池的核心,实现了线程池中的线程复用机制,来看一下
runWorker()方法都做了哪些工作:
- 运行第一个任务firstTask之后,循环调用getTask()方法获取任务,不断从任务缓存队列获取任务并执行;
- 获取到任务之后就对worker对象加锁,保证线程在执行任务的过程中不会被中断,任务执行完会释放锁;
- 在执行任务的前后,可以根据业务场景重写beforeExecute()和afterExecute()等Hook方法;
- 执行通过getTask()方法获取到的任务
- 线程执行结束后,调用processWorkerExit()方法执行结束线程的一些清理工作
从runWorker()方法的实现可以看出,runWorker()方法中主要调用了getTask()方法和processWorkerExit()方法,下面分别看一下这两个方法的实现。
getTask()的实现
getTask()方法用来不断地从任务缓存队列获取任务并交给线程执行,下面分析一下其实现。
接下来总结一下getTask()方法会在哪些情况下返回:
- 线程池处于RUNNING状态,阻塞队列不为空,返回成功获取的task对象
- 线程池处于SHUTDOWN状态,阻塞队列不为空,返回成功获取的task对象
- 线程池状态大于等于STOP,返回null,回收线程
- 线程池处于SHUTDOWN状态,并且阻塞队列为空,返回null,回收线程
- worker数量大于maximumPoolSize,返回null,回收线程
- 线程空闲时间超时,返回null,回收线程
processWorkerExit()的实现
processWorkerExit()方法负责执行结束线程的一些清理工作,下面分析一下其实现。