0%

Java并发队列简介

JDK 中提供了一系列场景的并发安全队列。总的来说,按照实现方式的不同可分为阻塞队列和非阻塞队列,前者使用锁实现,而后者则使用CAS 非阻塞算法实现。

ConcurrentLinkedQueue

ConcurrentLinkedQueue 是线程安全的无界非阻塞队列,其底层数据结构使用单向链表实现,对于入队和出队操作使用CAS 来实现线程安全

ConcurrentLinkedQueue 的底层使用单向链表数据结构来保存队列元素,每个元素被包装成一个Node 节点。队列是靠头、尾节点来维护的,创建队列时头、尾节点指向一个item 为null 的哨兵节点。第一次执行peek 操作时会把head 指向第一个真正的队列元素。由于使用非阻塞CAS 算法,没有加锁,所以在计算size 时有可能进行了offerpoll 或者remove 操作, 导致计算的元素个数不精确,所以在井发情况下size 函数不是很有用。

如图所示,入队、出队都是操作使用volatile 修饰的tailhead 节点,要保证在多线程下出入队线程安全,只需要保证这两个Node 操作的可见性和原子性即可。由于volatile 本身可以保证可见性,所以只需要保证对两个变量操作的原子性即可。

image-20211201121242572

LinkedBlockingQueue

LinkedBlockingQueue是使用独占锁实现的阻塞队列。LinkedBlockingQueue 的内部是通过单向链表实现的,使用头、尾节点来进行入队和出队操作,也就是入队操作都是对尾节点进行操作,出队操作都是对头节点进行操作。

如图所示,对头、尾节点的操作分别使用了单独的独占锁从而保证了原子性,所以出队和入队操作是可以同时进行的。另外对头、尾节点的独占锁都配备了一个条件队列,用来存放被阻塞的线程,并结合入队、出队操作实现了一个生产消费模型。

image-20211201121921403

ArrayBlockingQueue

ArrayBlockingQueue是有界数组方式实现的阻塞队列。

如图所示, ArrayBlockingQueue 通过使用全局独占锁实现了同时只能有一个线程进行入队或者出队操作,这个锁的粒度比较大。

其中,offerpoll 操作通过简单的加锁进行入队、出队操作,而puttake 操作则使用条件变量实现,如果队列满则等待,如果队列空则等待,然后分别在出队和入队操作中发送信号激活等待线程实现同步。

另外,相比LinkedBlockingQueueArrayBlockingQueuesize操作的结果是精确的, 因为计算前加了全局锁。

image-20211201122330680

PriorityBlockingQueue

PriorityBlockingQueue 是带优先级的无界阻塞队列,每次出队都返回优先级最高或者最低的元素。其内部是使用平衡二叉树堆实现的,所以直接遍历队列元素不保证有序。默认使用对象的compareTo 方法提供比较规则,如果你需要自定义比较规则则可以自定义comparators

PriorityBlockingQueue 队列在内部使用二叉树堆维护元素优先级,使用数组作为元素存储的数据结构,这个数组是可扩容的。当元素个数>=最大容量时会通过CAS 算法扩容。

PriorityBlockingQueue 类似于ArrayBlockingQueue在内部使用一个独占锁来控制,同时只有一个线程可以进行入队和出队操作。另外,PriorityBlockingQueue只使用了一个notEmpty 条件变量而没有使用notFull ,这是因为前者是无界队列。

image-20211201122801247

DelayQueue

DelayQueue 并发队列是一个无界阻塞延迟队列,队列中的每个元素都有个过期时间,当从队列获取元素时,只有过期元素才会出队列。队列头元素是最快要过期的元素。

DelayQueue 内部使用PriorityQueue存放数据,使用ReentrantLock 实现线程同步。另外,队列里面的元素要实现Delayed 接口,由于每个元素都有一个过期时间,所以要实现获知当前元素还剩下多少时间就过期了的接口。

1
2
3
4
public interface Delayed extends Comparable<Delayed> {
//获知当前元素还剩下多少时间就过期
long getDelay(TimeUnit unit);
}

image-20211201185757443