当前位置:首页 > 经验 >

线程池内部结构和实现原理(线程池的底层原理和实现方法)

来源:原点资讯(www.yd166.com)时间:2022-11-03 05:39:37作者:YD166手机阅读>>

线程池的基本结构

线程池的基本结构,如下图所示。用户通过使用线程池的execute方法将Runnable提交到线程池中进行执行。当线程池中的线程都很忙的时候,这个新加入的 Runnable 就会被放到等待队列。当线程空闲下来的时候,就会去等待队列里查看是否还有排队等待的任务,如果有,就会队列中取出任务并继续执行。如果没有,线程就会进入休眠。

线程池内部结构和实现原理,线程池的底层原理和实现方法(1)

如果我们更具体一点,就以本周第二节课所讲的ThreadPoolExecutor为例。当我们把一个Runnable交给线程池去执行的时候,这个线程池处理的流程是这样的:

  1. 先判断线程池中的核心线程们是否空闲,如果空闲,就把这个新的任务指派给某一个空闲线程去执行。如果没有空闲,并且当前线程池中的核心线程数还小于 corePoolSize,那就再创建一个核心线程。

  2. 如果线程池的线程数已经达到核心线程数,并且这些线程都繁忙,就把这个新来的任务放到等待队列中去。如果等待队列又满了,那么

  3. 查看一下当前线程数是否到达maximumPoolSize,如果还未到达,就继续创建线程。如果已经到达了,就交给RejectedExecutionHandler来决定怎么处理这个任务。

我们通过代码来验证一下上面讲的内容:

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }

看上去应该是一致的。另外,这份代码里有一段英文的注释,为了节约篇幅,我删掉了,大家可以去JDK的源代码里自行查看。

工作线程和等待队列

工作线程里的逻辑都封装在Worker这个内部类里了。代码如下所示:

final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks ; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }

关于锁和并发的问题,我们先不去管他。这段代码里有几个地方要注意的,第一,我们可以使用beforeExecute和afterExecute这两个方法去监控任务的执行情况,这些方法在ThreadPoolExecutor里都是空方法,我们可以重写这些方法来实现线程池的监控。第二,就是线程的逻辑是不断地执行一个循环,去调用 getTask 方法来获得任务(第一个任务是firstTask,这个请大家自己去看,我就不讲了)。所以这个getTask有必要研究一下:

private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }

这个方法里,前边的计数逻辑,我们先不看,这个方法的核心点在于workQueue.poll,workQueue的类型是

private final BlockingQueue<Runnable> workQueue;

这是一个阻塞队列!!这就意味着,如果某一个线程试图从这个队列中取数据,而这个队列里没有数据的时候,线程就会进入休眠了。

至于前边的那些逻辑,加加减减的,并不重要,感兴趣的可以自己查看这个方法的注释,总而言之,就是返回一个null,让worker线程退出。所以,getTask里能返回 null 的分支都是满足线程退出条件的。

RejectedExecutionHandler

当队列和线程池都满了的时候,再有新的任务到达,就必须要有一种办法来处理新来的任务。Java线程池中提供了以下四种策略:

  1. AbortPolicy: 直接抛异常

  2. CallerRunsPolicy:让调用者帮着跑这个任务

  3. DiscardOldestPolicy:丢弃队列里最老的那个任务,执行当前任务

  4. DiscardPolicy:不处理,直接扔掉

例如,我们看一个例子:

public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }

这个例子就是,如果线程池和队列都满了,那做为调用者,您就自己去执行 r.run 吧,我线程池是不管了。您爱咋咋滴。另外,让调用者去自己执行,也可以让调用者不要再往线程池里放任务了,有帮于减轻线程池的压力。

栏目热文

java线程池工作原理(mycat和sharding优缺点)

java线程池工作原理(mycat和sharding优缺点)

关注公众号领资料搜索公众号【Java耕耘者】,回复【Java】,即可获取大量优质电子书和一份Java高级架构资料、Spr...

2022-11-03 05:30:30查看全文 >>

线程池底层原理(线程池设计原理图解)

线程池底层原理(线程池设计原理图解)

java的线程池说白了就是建一个任务池子跟一个线程池子,当任务池子里的任务过来了就由线程池里的活跃线程去消费,等任务池子...

2022-11-03 05:58:11查看全文 >>

线程池工作原理图解(线程池的结构和原理)

线程池工作原理图解(线程池的结构和原理)

前言本文以程序员做需求的例子,比喻线程池的工作过程。以故事白话的方式展开,跟大家阐述线程池工作原理,以方便大家更好理解线...

2022-11-03 05:52:26查看全文 >>

线程池原理和实例(线程池的结构和原理)

线程池原理和实例(线程池的结构和原理)

前言线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,合理的使用线程池对线程进行统一分配、调...

2022-11-03 06:03:39查看全文 >>

mycat和sharding优缺点(mycat的替代品)

mycat和sharding优缺点(mycat的替代品)

在我们的项目发展到一定阶段之后,随着数据量的增大,分库分表就变成了一件非常自然的事情。常见的分库分表方式有两种:客户端模...

2022-11-03 05:56:38查看全文 >>

线程池的拒绝策略(线程池拒绝策略有哪些)

线程池的拒绝策略(线程池拒绝策略有哪些)

jdk1.5版本新增了 JUC 并发包,其中一个包含线程池。四种拒绝策略:拒绝策略类型说明1ThreadPoolExec...

2022-11-03 06:05:44查看全文 >>

线程池满了不丢弃任务(线程池队列满了如何处理)

线程池满了不丢弃任务(线程池队列满了如何处理)

public static ExecutorService newFixedThreadPool(int nThread...

2022-11-03 05:49:52查看全文 >>

自制小麻薯手帐胶带(小麻薯手帐胶带五元100个)

自制小麻薯手帐胶带(小麻薯手帐胶带五元100个)

{"rich_content":{"text":"","spans":null},"video":{"vid":"v02...

2022-11-03 05:52:54查看全文 >>

自己制作手帐胶带的方法(如何自制手帐胶带美观又简单)

自己制作手帐胶带的方法(如何自制手帐胶带美观又简单)

相信大多数人和我一样,最初入坑手帐,都是从胶带开始“沦陷”的。但是一旦你开始囤胶带,必然会“选择困难”,和纸、PET、水...

2022-11-03 05:44:37查看全文 >>

自制十二星座手帐胶带(十二星座闺蜜手帐)

自制十二星座手帐胶带(十二星座闺蜜手帐)

星座本来是一群群恒星的组合,却也与人有着密不可分的联系。常常会有人问你:“你是什么星座”?甚至很多小仙女都用星座来做匹配...

2022-11-03 05:43:32查看全文 >>

文档排行