线程池
线程池与new Thread
线程池的好处
- 重用存在的线程,减少对象创建、消亡的开销,性能好
- 可有效控制最大并发线程数,提高系统资源利用率,同时可以避免过多资源竞争,避免阻塞。
- 提供定时执行、定期执行、单线程、并发数控制等功能。
new Thread的弊端
- 每次new Thread 新建对象,性能差
- 线程缺乏统一管理,可能无限制的新建线程,相互竞争,可能占用过多的系统资源导致死机或者OOM(out of memory 内存溢出),这种问题的原因不是因为单纯的new一个Thread,而是可能因为程序的bug或者设计上的缺陷导致不断new Thread造成的。
- 缺少更多功能,如更多执行、定期执行、线程中断。
线程池的结构
在线程池的类图中,我们最常使用的是最下边的Executors,用它来创建线程池使用线程。那么在上边的类图中,包含了一个Executor框架,它是一个根据一组执行策略的调用调度执行和控制异步任务的框架,目的是提供一种将任务提交与任务如何运行分离开的机制。它包含了三个executor接口:
- Executor:运行新任务的简单接口
- ExecutorService:扩展了Executor,添加了用来管理执行器生命周期和任务生命周期的方法
- ScheduleExcutorService:扩展了ExecutorService,支持Future和定期执行任务
ThreadPoolExecutor
工作原理
- 线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。
- 当调用 execute() 方法添加一个任务时,线程池会做如下判断:
- 如果正在运行的线程数量小于
corePoolSize
,那么马上创建线程运行这个任务; - 如果正在运行的线程数量大于或等于
corePoolSize
,那么将这个任务放入队列。 - 如果这时候队列满了,而且正在运行的线程数量小于
maximumPoolSize
,那么还是要创建线程运行这个任务; - 如果队列满了,而且正在运行的线程数量大于或等于
maximumPoolSize
,那么线程池会抛出异常,告诉调用者“我不能再接受任务了”。
- 当一个线程完成任务时,它会从队列中取下一个任务来执行。
- 当一个线程无事可做,超过一定的时间(
keepAliveTime
)时,线程池会判断,如果当前运行的线程数大于corePoolSize
,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到corePoolSize
的大小。
配置项
1 | /** |
corePoolSize
:指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue
任务队列中去maximumPoolSize
:指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue
任务队列的类型,决定线程池会开辟的最大线程数量;keepAliveTime
:当线程数大于核心数时,多余的闲置线程在会在多长时间内被销毁unit
:keepAliveTime
的单位workQueue
:任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种;threadFactory
:线程工厂,用于创建线程,一般用默认即可;handler
:拒绝策略;当任务太多来不及处理时,如何拒绝任务;
线程池的阻塞队列
通道,有以下一些阻塞队列可供选择:
ArrayBlockingQueue
是一个有边界的阻塞队列,它的内部实现是一个数组。有边界的意思是它的容量是有限的,我们必须在其初始化的时候指定它的容量大小,容量大小一旦指定就不可改变。DelayQueue
阻塞的是其内部元素,DelayQueue
中的元素必须实现java.util.concurrent.Delayed
接口,该接口只有一个方法就是long getDelay(TimeUnit unit)
,返回值就是队列元素被释放前的保持时间,如果返回0或者一个负值,就意味着该元素已经到期需要被释放,此时DelayedQueue
会通过其take()方法释放此对象,DelayQueue
可应用于定时关闭连接、缓存对象,超时处理等各种场景;LinkedBlockingQueue
阻塞队列大小的配置是可选的,如果我们初始化时指定一个大小,它就是有边界的,如果不指定,它就是无边界的。说是无边界,其实是采用了默认大小为Integer.MAX_VALUE
的容量 。它的内部实现是一个链表。PriorityBlockingQueue
是一个没有边界的队列,它的排序规则和java.util.PriorityQueue
一样。需要注意,PriorityBlockingQueue
中允许插入null对象。所有插入PriorityBlockingQueue
的对象必须实现java.lang.Comparable
接口,队列优先级的排序规则就是按照我们对这个接口的实现来定义的。SynchronousQueue
队列内部仅允许容纳一个元素。当一个线程插入一个元素后会被阻塞,除非这个元素被另一个线程消费。
使用的最多的应该是LinkedBlockingQueue,注意一般情况下要配置一下队列大小,设置成有界队列,否则JVM内存会被撑爆!
饱和策略
线程池已满的定义,是指运行线程数==maximumPoolSize
,并且workQueue是有界队列并且已满(如果是无界队列当然永远不会满);这时候再提交任务怎么办呢?线程池会将任务传递给最后一个参数RejectedExecutionHandler
来处理,比如打印报错日志、抛出异常、存储到Mysql/redis用于后续处理等等,线程池默认也提供了几种处理方式:
- 在默认的
ThreadPoolExecutor.AbortPolicy
中,处理程序遭到拒绝将抛出运行时RejectedExecutionException
。 - 在
ThreadPoolExecutor.CallerRunsPolicy
中,线程调用运行该任务的execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。 - 在
ThreadPoolExecutor.DiscardPolicy
中,不能执行的任务将被删除。 - 在
ThreadPoolExecutor.DiscardOldestPolicy
中,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程) - 当然也可以自己实现处理策略类,继承
RejectedExecutionHandler
接口即可,该接口只有一个方法:void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
线程池配置优化
一般需要根据任务的类型来配置线程池大小:如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1。如果是IO密集型任务,参考值可以设置为2*NCPU。
当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。
其中NCPU的指的是CPU的核心数,可以使用
Runtime.getRuntime().availableProcessors()
来获取;
线程池的创建
使用Executor可以创建四种线程池:分别对应上边提到的四种线程池初始化方法
newCachedThreadPool
创建一个可缓存的线程池,如果线程池的长度超过了处理的需要,可以灵活回收空闲线程。如果没有可回收的就新建线程。1
2
3
4
5
6//源码:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}值得注意的一点是,
newCachedThreadPool
的返回值是ExecutorService类型,该类型只包含基础的线程池方法,但却不包含线程监控相关方法,因此在使用返回值为ExecutorService的线程池类型创建新线程时要考虑到具体情况。newFixedThreadPool
- 定长线程池,可以设置线程的最大并发数,超出在队列等待
1
2
3
4
5
6//源码:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}newSingleThreadExecutor
单线程化的线程池,用唯一的一个共用线程执行任务,保证所有任务按指定顺序执行(FIFO、优先级…)
1
2
3
4
5
6
7//源码
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}newScheduledThreadPool
定长线程池,支持定时和周期任务执行
1
2
3
4
5
6
7
8//源码:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,//此处super指的是ThreadPoolExecutor
new DelayedWorkQueue());
}ScheduledExecutorService
提供了三种方法可以使用:schedule
:在指定delay(延时)之后,执行提交Runnable的任务scheduleAtFixedRate
:以指定的速率执行任务scheduleWithFixedDelay
:以指定的延迟执行任务