0%

线程池

线程池

线程池与new Thread

线程池的好处

  • 重用存在的线程,减少对象创建、消亡的开销,性能好
  • 可有效控制最大并发线程数,提高系统资源利用率,同时可以避免过多资源竞争,避免阻塞。
  • 提供定时执行、定期执行、单线程、并发数控制等功能。

new Thread的弊端

  • 每次new Thread 新建对象,性能差
  • 线程缺乏统一管理,可能无限制的新建线程,相互竞争,可能占用过多的系统资源导致死机或者OOM(out of memory 内存溢出),这种问题的原因不是因为单纯的new一个Thread,而是可能因为程序的bug或者设计上的缺陷导致不断new Thread造成的。
  • 缺少更多功能,如更多执行、定期执行、线程中断。

线程池的结构

image-20210504093153759

在线程池的类图中,我们最常使用的是最下边的Executors,用它来创建线程池使用线程。那么在上边的类图中,包含了一个Executor框架,它是一个根据一组执行策略的调用调度执行和控制异步任务的框架,目的是提供一种将任务提交与任务如何运行分离开的机制。它包含了三个executor接口:

  • Executor:运行新任务的简单接口
  • ExecutorService:扩展了Executor,添加了用来管理执行器生命周期和任务生命周期的方法
  • ScheduleExcutorService:扩展了ExecutorService,支持Future和定期执行任务

ThreadPoolExecutor

工作原理

这里写图片描述

  1. 线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。
  2. 当调用 execute() 方法添加一个任务时,线程池会做如下判断:
  • 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
  • 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列。
  • 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务;
  • 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常,告诉调用者“我不能再接受任务了”。
  1. 当一个线程完成任务时,它会从队列中取下一个任务来执行。
  2. 当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

配置项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
  • corePoolSize:指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去
  • maximumPoolSize:指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量;
  • keepAliveTime:当线程数大于核心数时,多余的闲置线程在会在多长时间内被销毁
  • unit:keepAliveTime的单位
  • workQueue:任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种;
  • threadFactory:线程工厂,用于创建线程,一般用默认即可;
  • handler:拒绝策略;当任务太多来不及处理时,如何拒绝任务;

线程池的阻塞队列

通道,有以下一些阻塞队列可供选择:

  • ArrayBlockingQueue是一个有边界的阻塞队列,它的内部实现是一个数组。有边界的意思是它的容量是有限的,我们必须在其初始化的时候指定它的容量大小,容量大小一旦指定就不可改变。
  • DelayQueue阻塞的是其内部元素,DelayQueue中的元素必须实现 java.util.concurrent.Delayed接口,该接口只有一个方法就是long getDelay(TimeUnit unit),返回值就是队列元素被释放前的保持时间,如果返回0或者一个负值,就意味着该元素已经到期需要被释放,此时DelayedQueue会通过其take()方法释放此对象,DelayQueue可应用于定时关闭连接、缓存对象,超时处理等各种场景;
  • LinkedBlockingQueue阻塞队列大小的配置是可选的,如果我们初始化时指定一个大小,它就是有边界的,如果不指定,它就是无边界的。说是无边界,其实是采用了默认大小为Integer.MAX_VALUE的容量 。它的内部实现是一个链表。
  • PriorityBlockingQueue是一个没有边界的队列,它的排序规则和 java.util.PriorityQueue一样。需要注意,PriorityBlockingQueue中允许插入null对象。所有插入PriorityBlockingQueue的对象必须实现 java.lang.Comparable接口,队列优先级的排序规则就是按照我们对这个接口的实现来定义的。
  • SynchronousQueue队列内部仅允许容纳一个元素。当一个线程插入一个元素后会被阻塞,除非这个元素被另一个线程消费。

使用的最多的应该是LinkedBlockingQueue,注意一般情况下要配置一下队列大小,设置成有界队列,否则JVM内存会被撑爆!

饱和策略

线程池已满的定义,是指运行线程数==maximumPoolSize,并且workQueue是有界队列并且已满(如果是无界队列当然永远不会满);这时候再提交任务怎么办呢?线程池会将任务传递给最后一个参数RejectedExecutionHandler来处理,比如打印报错日志、抛出异常、存储到Mysql/redis用于后续处理等等,线程池默认也提供了几种处理方式:

  • 在默认的 ThreadPoolExecutor.AbortPolicy 中,处理程序遭到拒绝将抛出运行时RejectedExecutionException
  • ThreadPoolExecutor.CallerRunsPolicy 中,线程调用运行该任务的execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
  • ThreadPoolExecutor.DiscardPolicy 中,不能执行的任务将被删除。
  • ThreadPoolExecutor.DiscardOldestPolicy 中,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)
  • 当然也可以自己实现处理策略类,继承RejectedExecutionHandler接口即可,该接口只有一个方法:
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);

线程池配置优化

一般需要根据任务的类型来配置线程池大小:如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1。如果是IO密集型任务,参考值可以设置为2*NCPU。

当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。

其中NCPU的指的是CPU的核心数,可以使用Runtime.getRuntime().availableProcessors()来获取;

线程池的创建

使用Executor可以创建四种线程池:分别对应上边提到的四种线程池初始化方法

  • newCachedThreadPool
    创建一个可缓存的线程池,如果线程池的长度超过了处理的需要,可以灵活回收空闲线程。如果没有可回收的就新建线程。

    1
    2
    3
    4
    5
    6
    //源码:
    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    }

    值得注意的一点是,newCachedThreadPool的返回值是ExecutorService类型,该类型只包含基础的线程池方法,但却不包含线程监控相关方法,因此在使用返回值为ExecutorService的线程池类型创建新线程时要考虑到具体情况。

  • newFixedThreadPool

    • 定长线程池,可以设置线程的最大并发数,超出在队列等待
    1
    2
    3
    4
    5
    6
    //源码:
    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }
  • newSingleThreadExecutor

    单线程化的线程池,用唯一的一个共用线程执行任务,保证所有任务按指定顺序执行(FIFO、优先级…)

    1
    2
    3
    4
    5
    6
    7
    //源码
    public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>()));
    }
  • newScheduledThreadPool

    定长线程池,支持定时和周期任务执行

    1
    2
    3
    4
    5
    6
    7
    8
    //源码:
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,//此处super指的是ThreadPoolExecutor
    new DelayedWorkQueue());
    }

    ScheduledExecutorService提供了三种方法可以使用:

    • schedule:在指定delay(延时)之后,执行提交Runnable的任务

    • scheduleAtFixedRate:以指定的速率执行任务

    • scheduleWithFixedDelay:以指定的延迟执行任务

java线程池ThreadPoolExecutor类使用详解 - bigfan - 博客园 (cnblogs.com)