一 基本概念
线程池解决两个不同的问题:由于每个任务的调用开销减少,它们通常在执行大量异步任务时提供改进的性能,并且它们提供了一种限制和管理资源(包括执行一个任务。 每个ThreadPoolExecutor
还维护一些基本统计信息,例如已完成任务的数量。
为了在广泛的上下文中有用,此类提供了许多可调参数和可扩展性钩子。 然而,程序员被敦促使用更方便的Executors
工厂方法Executors.newCachedThreadPool()
(无限线程池,具有自动线程回收), Executors.newFixedThreadPool(int)
(固定大小的线程池)和Executors.newSingleThreadExecutor()
(单个后台线程),可以预先配置最常用的使用场景设置。 否则,手动配置和调优此类时,请使用以下指南:
1.1 Core and maximum pool sizes
ThreadPoolExecutor将根据corePoolSize和maximumPoolSize设置的边界自动调整池大小。 当在方法execute(Runnable)中提交新任务且运行的线程数少于corePoolSize线程时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理请求。 如果运行的线程数大于corePoolSize但小于maximumPoolSize,则仅在队列已满时才创建新线程。 通过将corePoolSize和maximumPoolSize设置为相同,可以创建固定大小的线程池。 通过将maximumPoolSize设置为一个本质上不受限制的值(例如Integer.MAX_VALUE),可以允许池容纳任意数量的并发任务。 通常,核心和最大池大小仅在构造时设置,但也可以使用setCorePoolSize和setMaximumPoolSize动态更改。
corePoolSize : 除非设置allowCoreThreadTimeOut,否则核心池大小是保持活动状态(且不允许超时等)的最小工作线程数,在这种情况下,最小值为零。
maximumPoolSize : 最大池大小。 请注意,实际最大值在内部受“容量”限制。
1.2 On-demand construction(按需施工)
默认情况下,甚至只有在有新任务到达时才开始甚至启动核心线程,但是可以使用prestartCoreThread或prestartAllCoreThreads方法动态地覆盖它。 如果使用非空队列构造池,则可能要预启动线程
1.3 Creating new threads (创建新线程)
使用ThreadFactory创建新线程。 如果没有另外指定,则使用Executors.defaultThreadFactory,该线程创建的线程全部位于同一ThreadGroup中,并且具有相同的NORM_PRIORITY优先级和非守护程序状态。 通过提供不同的ThreadFactory,可以更改线程的名称,线程组,优先级,守护程序状态等。如果ThreadFactory在通过从newThread返回null返回要求时未能创建线程,执行器将继续执行,但可能无法执行 执行任何任务。 线程应具有“ modifyThread” RuntimePermission。 如果使用该池的工作线程或其他线程不具有此许可权,则服务可能会降级:配置更改可能不会及时生效,并且关闭池可能会处于可能终止但未完成的状态
1.4 Keep-alive times
如果当前池中的线程数超过corePoolSize,则多余的线程将在空闲时间超过keepAliveTime时终止(请参阅getKeepAliveTime(TimeUnit))。 当不积极使用池时,这提供了减少资源消耗的方法。 如果池稍后变得更加活跃,则将构建新线程。 也可以使用setKeepAliveTime(long,TimeUnit)方法动态更改此参数。 使用Long.MAX_VALUE TimeUnit.NANOSECONDS值可以有效地使空闲线程永远不会在关闭之前终止。 默认情况下,仅当存在多个corePoolSize线程时,保持活动策略才适用。 但是,只要keepAliveTime值不为零,方法allowCoreThreadTimeOut(boolean)还可用于将此超时策略应用于核心线程。
1.5 Queuing (排队)
任何BlockingQueue均可用于传输和保留提交的任务。 此队列的使用与池大小交互:
- 如果运行的线程数少于corePoolSize,则执行程序总是喜欢添加新线程,而不是排队。
- 如果正在运行corePoolSize或更多线程,则执行程序总是更喜欢对请求进行排队,而不是添加新线程。
- 如果无法将请求放入队列中,则将创建一个新线程,除非该线程超过了maximumPoolSize,在这种情况下,该任务将被拒绝。
有三种一般的排队策略:
- 直接交接。工作队列的一个很好的默认选择是SynchronousQueue,它可以将任务移交给线程,而不必另外保留它们。在这里,如果没有立即可用的线程来运行任务,则尝试将任务排队将失败,因此将构造一个新线程。在处理可能具有内部依赖项的请求集时,此策略避免了锁定。直接切换通常需要无限制的maximumPoolSizes以避免拒绝新提交的任务。反过来,当平均而言,命令继续以比其处理速度更快的速度到达时,就可以实现无限线程增长的可能性。
- 无限队列。使用无界队列(例如,没有预定义容量的LinkedBlockingQueue)将在所有corePoolSize线程繁忙时使新任务在队列中等待。因此,将仅创建corePoolSize线程。 (因此,maximumPoolSize的值没有任何作用。)当每个任务完全独立于其他任务时,这可能是适当的,因此任务不会影响彼此的执行。例如,在网页服务器中。尽管这种排队方式对于消除短暂的请求突发很有用,但它承认当命令平均继续以比处理速度更快的速度到达时,工作队列会无限增长。
- 有界队列。与有限的maximumPoolSizes一起使用时,有界队列(例如ArrayBlockingQueue)有助于防止资源耗尽,但调优和控制起来会更加困难。队列大小和最大池大小可能会相互折衷:使用大队列和小池可以最大程度地减少CPU使用率,操作系统资源和上下文切换开销,但会导致人为地降低吞吐量。如果任务频繁阻塞(例如,如果它们受I / O约束),则系统可能可以为您安排的线程调度的时间超出您的允许范围。使用小队列通常需要更大的池大小,这会使CPU繁忙,但可能会遇到不可接受的调度开销,这也会降低吞吐量。
1.6 Rejected tasks
当执行器关闭时,并且在执行器对最大线程和工作队列容量使用有限范围时,执行器将关闭在方法execute(Runnable)中提交的新任务。 无论哪种情况,execute方法都会调用其RejectedExecutionHandler的RejectedExecutionHandler.rejectedExecution(Runnable,ThreadPoolExecutor)方法。 提供了四个预定义的处理程序策略:
- 在默认的ThreadPoolExecutor.AbortPolicy中,处理程序在拒绝时会抛出运行时RejectedExecutionException。
- 在ThreadPoolExecutor.CallerRunsPolicy中,调用执行自己的线程运行任务。 这提供了一种简单的反馈控制机制,它将降低新任务的提交速度。
- 在ThreadPoolExecutor.DiscardPolicy中,简单地删除了无法执行的任务。
- 在ThreadPoolExecutor.DiscardOldestPolicy中,如果未关闭执行程序,则将丢弃工作队列开头的任- 务,然后重试执行(该操作可能再次失败,导致重复执行此操作)。
可以定义和使用其他种类的RejectedExecutionHandler类。 这样做需要格外小心,尤其是在设计策略仅在特定容量或排队策略下工作时。
1.7 Hook methods
此类提供受保护的可重写的beforeExecute(Thread,Runnable)和afterExecute(Runnable,Throwable)方法,这些方法在执行每个任务之前和之后调用。 这些可以用来操纵执行环境。 例如,重新初始化ThreadLocals,收集统计信息或添加日志条目。 此外,一旦执行程序完全终止,可以终止方法终止以执行需要执行的任何特殊处理。
如果钩子或回调方法引发异常,内部工作线程可能进而失败并突然终止。
1.8 Queue maintenance
方法getQueue()允许访问工作队列,以进行监视和调试。 强烈建议不要将此方法用于任何其他目的。 当取消大量排队的任务时,可以使用提供的两种方法remove(Runnable)和purge来帮助回收存储。
1.9 Finalization
程序中不再引用且没有剩余线程的池将自动关闭。 如果即使在用户忘记调用shutdown的情况下也要确保收回未引用的池,则必须使用零核心线程的下限和/或设置allowCoreThreadTimeOut来设置适当的保活时间,以安排未使用的线程最终死掉 (布尔值)。
二 源码相关
2.1 核心属性
主池控制状态ctl是一个打包两个概念字段workerCount的原子整数,指示线程的有效数量runState,指示是否运行,关闭等。为了将它们打包为一个int,我们将workerCount限制为(2 ^ 29 )-1(约5亿)个线程,而不是(2 ^ 31)-1(20亿)可以表示的线程。如果将来有问题,可以将该变量更改为AtomicLong,并在下面调整shift / mask常数。但是在需要之前,使用int可以使此代码更快,更简单。 workerCount是已被允许启动但不允许停止的工人数。该值可能与活动线程的实际数量暂时不同,例如,当ThreadFactory在被询问时未能创建线程,并且退出线程仍在终止之前执行簿记操作时,该值会有所不同。用户可见的池大小报告为工作集的当前大小。 runState提供主要的生命周期控制,并具有以下值:RUNNING:接受新任务并处理排队的任务SHUTDOWN:不接受新任务,但处理排队的任务STOP:不接受新任务,不处理排队的任务,并中断进行中的任务TIDYING:所有任务都已终止,workerCount为零,线程转换到TIDYING状态将运行Terminated()挂钩方法TERMINATED:terminald()已完成这些值之间的数值顺序很重要,可以进行有序比较。 runState随时间单调增加,但不必达到每个状态。转换为:RUNNING-> SHUTDOWN在调用shutdown()时,可能隐式在finalize()中(RUNNING或SHUTDOWN)-> STOP在调用shutdownNow()时SHUTDOWN-> TIDYING当队列和池都为空时STOP-> TIDYING当池为空时TIDYING-> TERMINATED当Terminated()挂钩方法完成时,状态达到TERMINATED时,在awaitTermination()中等待的线程将返回。检测从SHUTDOWN到TIDYING的转换并不像您想要的那样简单,因为在SHUTDOWN状态期间,队列在非空之后可能会变空,反之亦然,但是只有在看到它为空之后,我们看到workerCount为0(有时需要重新检查-参见下文)。
1 | private final ReentrantLock mainLock = new ReentrantLock(); |
2.2 构造函数
默认情况下,被拒绝的任务的处理程序为 AbortPolicy ,即 被拒绝的任务的处理程序,抛出一个 RejectedExecutionException
在使用构造函数时,可能会在以下情况时产生运行时异常
IllegalArgumentException
corePoolSize < 0 keepAliveTime < 0 maximumPoolSize <= 0 maximumPoolSize < corePoolSize
NullPointerException
workQueue is null
在构造函数中使用到的队列的情况如下
队列还支持以下操作:在检索元素时等待队列变为非空,并在存储元素时等待队列中的空间变为可用。
BlockingQueue方法有四种形式,它们以不同的方式处理操作,这些方法无法立即满足,但将来可能会满足:一种抛出异常,第二种返回特殊值(null或false,具体取决于 操作),第三个块将无限期地阻塞当前线程,直到操作成功为止;第四个块仅在给定的最大时间限制内放弃。 下表总结了这些方法:
BlockingQueue不接受空元素。在尝试添加,放置或提供null时,实现会引发NullPointerException。空值用作标记值,以指示轮询操作失败。
BlockingQueue可能受容量限制。在任何给定时间,它可能具有剩余容量,超过该容量就不能放置其他元素而不会阻塞。没有任何内部容量约束的BlockingQueue始终报告Integer.MAX_VALUE的剩余容量。
BlockingQueue实现被设计为主要用于生产者-消费者队列,但另外还支持Collection接口。因此,例如,可以使用remove(x)从队列中删除任意元素。但是,这样的操作通常不能非常有效地执行,并且仅用于偶尔的使用,例如在取消排队的消息时。
BlockingQueue实现是线程安全的。所有排队方法都是使用内部锁或其他形式的并发控制来原子地实现其效果的。但是,除非在实现中另行指定,否则批量Collection操作addAll,containsAll,retainAll和removeAll不一定是原子执行的。因此,例如,仅在c中添加一些元素之后,addAll(c)可能会失败(引发异常)。
BlockingQueue本质上不支持任何类型的“关闭”或“关闭”操作,以指示将不再添加任何项目。这些功能的需求和使用往往取决于实现。例如,一种常见的策略是让生产者插入特殊的流尾对象或有毒对象,当消费者采取这种方法时会对其进行相应的解释。
使用示例,基于典型的生产者-消费者方案。请注意,BlockingQueue可以安全地与多个生产者和多个消费者一起使用。
2.3 核心方法
1 | /** |
由此可见最终执行的都是 execute 方法,该方法的定义如下
1 | /** |
添加一个任务主要分为以下三步
- 1.如果正在运行的线程少于corePoolSize线程,请尝试执行以下操作:使用给定的命令作为第一个启动新线程任务。 对addWorker的调用自动检查runState和workerCount,这样可以防止假警报增加通过返回false返回不应该执行的线程。
- 2.如果任务可以成功排队,那么我们仍然需要仔细检查我们是否应该添加线程(因为现有的自上次检查后死亡)或自进入此方法以来,该池已关闭。 所以我们重新检查状态,并在必要时回退排队停止,如果没有,则启动一个新线程。
- 3.如果我们无法将任务排队,那么我们尝试添加一个新的线。 如果失败,我们知道我们已经关闭或饱和,因此拒绝任务。
在将来的某个时间执行给定的任务。 该任务可以在新线程或现有池线程中执行。 如果由于该执行器已关闭或已达到其能力而无法提交执行任务,则该任务由当前的RejectedExecutionHandler处理。
实际上添加任务的方法的定义如下
1 | /** |
该部分代码主要目的是检查是否可以针对当前池状态和给定的边界(核心或最大值)添加新的工作程序。 如果是这样,则会相应地调整工作程序计数,并且如果可能,会创建并启动一个新的工作程序,并运行firstTask作为其第一个任务。 如果池已停止或有资格关闭,则此方法返回false。 如果在询问时线程工厂无法创建线程,则它还会返回false。 如果线程创建失败(由于线程工厂返回null或由于异常(通常是Thread.start()中的OutOfMemoryError)),我们将进行干净的回滚。
在添加失败时会触发拒绝策略,改部分代码如下
1 | /** |
三 实践
3.1 任务时间小于Keep-alive times
3.1.1 超过最大线程数
本例子中,核心线程数为1,最大线程是2,队列的最大容量为10,尝试添加5个线程
1 | public class ThreadDemo { |
运行上述程序,程序正常,得到以下的结果
1 | 1612506856883 |
【注意】代码运行完成之后,此进程未关闭
3.1.2 超过队列容量
本例子中,核心线程数为1,最大线程是2,队列的最大容量为5,尝试添加10个线程
1 | public class ThreadDemo { |
运行程序,得到以下结果
1 | Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@335eadca rejected from java.util.concurrent.ThreadPoolExecutor@6ddf90b0[Running, pool size = 2, active threads = 2, queued tasks = 5, completed tasks = 0] |
再次运行程序
1 | Exception in thread "main" 1612507356405 |
可以看得出,两次运行的结果报错的位置不一样,但是两次运行均失败,在运行过程中发生了异常情况
3.1.3 结论
通过上述实例可知,在任务时间小于Keep-alive时,或者说Keep-alive时间为一个相对较大的值的时候,任务队列决定着最大可接收的任务的数量
3. 2任务时间大于Keep-alive times
3.2.1 超过最大线程数
本例子中,核心线程数为1,最大线程是2,队列的最大容量为10,尝试添加5个线程
其中每个线程里的任务完成时间需要4秒钟,Keep-alive times为3秒钟
1 | public class ThreadDemo { |
多次运行上述程序,程序运行正常。
3.2.2 超过队列容量
本例子中,核心线程数为1,最大线程是2,队列的最大容量为5,尝试添加10个线程
其中每个线程里的任务完成时间需要4秒钟,Keep-alive times为3秒钟
1 | public class ThreadDemo { |
多次运行程序,其中数次的运行结果如下
1 | Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@335eadca rejected from java.util.concurrent.ThreadPoolExecutor@6ddf90b0[Running, pool size = 2, active threads = 2, queued tasks = 5, completed tasks = 0] |
另一次为
1 | Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@335eadca rejected from java.util.concurrent.ThreadPoolExecutor@6ddf90b0[Running, pool size = 2, active threads = 2, queued tasks = 5, completed tasks = 0] |
可以看到,两次执行时均有异常发生,并且任务未全部执行完成。