java原生队列

队列简介

先进先出(FIFO),生产者往队列里发布事件,消费者获得通知消费事件;如果队列中没有事件时,消费者堵塞,直到生产者发布了新事件。

Queue 接口与 List、Set 同一级别,都是继承了 Collection 接口。LinkedList 实现了 Deque 接口。

Java 内置队列

队列 有界性 阻塞 数据结构 描述
ArrayBlockingQueue bounded 加锁 arraylist 一个用数组实现的有界阻塞队列,此队列按照先进先出 (FIFO) 的原则对元素进行排序。 支持公平锁和非公平锁。
LinkedBlockingQueue optionally-bounded 加锁 linkedlist 一个由链表结构组成的有界队列,此队列按照先进先出 (FIFO) 的原则对元素进行排序。此队列的默认长度为 Integer.MAX_VALUE,所以默认创建的该队列有容量危险。
LinkedBlockingDeque optionally-bounded 加锁 linkedlist 一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竟争最多降到一半。
ConcurrentLinkedQueue unbounded CAS linkedlist 一个采用双向链表实现的无界并发非阻塞队列,它属于 LinkedQueue 的安全版本。 ConcurrentLinkedQueue 内部采用 CAS 操作保证线程安全,这是非阻塞队列实现的基础,相比 ArrayBlockingQueue、LinkedBlockingQueue 具备较高的性能。
LinkedTransferQueue unbounded CAS linkedlist 一个由链表结构组成的无界阻塞队列,相对于其它队列, LinkedTransferQueue 队列多了 transfer 和 tryTransfer 方法。
PriorityBlockingQueue unbounded 加锁 heap 一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现 compareTo() 方法来指定元素排序规则,不能保证同优先级元素的顺序。
DelayQueue unbounded 加锁 heap 一个实现 PriorityBlockingQueue 实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。 只有延时期满后才能从队列中获取元素。
SynchronousQueue synchronized CAS linkedlist 一个不存储元素的阻塞队列,每一个put操作必须等待 take 操作,否则不能添加元素。 支持公平锁和非公平锁。 SynchronousQueue 的一个使用场景是在线程池里。 Executors.newCachedThreadPool() 就使用了 SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。

队列加锁性能

现实编程过程中,加锁通常会严重地影响性能。线程会因为竞争不到锁而被挂起,等锁被释放的时候,线程又会被恢复,这个过程中存在着很大的开销,并且通常会有较长时间的中断,因为当一个线程正在等待锁时,它不能做任何其他事情。如果一个线程在持有锁的情况下被延迟执行,例如发生了缺页错误、调度延迟或者其它类似情况,那么所有需要这个锁的线程都无法执行下去。如果被阻塞线程的优先级较高,而持有锁的线程优先级较低,就会发生优先级反转。

Disruptor 论文中讲述了一个实验:

  • 这个测试程序调用了一个函数,该函数会对一个 64 位的计数器循环自增 5 亿次。
  • 机器环境:2.4G 6 核
  • 运算: 64 位的计数器累加 5 亿次
Method Time (ms)
Single thread 300
Single thread with CAS 5,700
Single thread with lock 10,000
Single thread with volatile write 4,700
Two threads with CAS 30,000
Two threads with lock 224,000

CAS 操作比单线程无锁慢了 1 个数量级;有锁且多线程并发的情况下,速度比单线程无锁慢 3 个数量级。可见无锁速度最快。

单线程情况下,不加锁的性能 > CAS 操作的性能 > 加锁的性能。

多线程情况下,为了保证线程安全,必须使用 CAS 或锁,这种情况下,CAS 的性能超过锁的性能,前者大约是后者的 8 倍。

综上可知,加锁的性能是最差的。

ArrayBlockingQueue

ArrayBlockingQueue 是 BlockingQueue 接口的有界队列实现类,底层采用数组来实现。

ArrayBlockingQueue 必须指定长度,且一旦创建,容量不能改变。

ArrayBlockingQueue 采用 ReentrantLock 来控制并发,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。当队列容量满时,尝试将元素放入队列将导致操作阻塞;尝试从一个空队列中取一个元素也会同样阻塞。

ArrayBlockingQueue 默认是非公平队列。

ArrayBlockingQueue 用 Object[]存储对象。

  1. 线程安全是指:ArrayBlockingQueue 内部通过“互斥锁”保护竞争资源,实现了多线程对竞争资源的互斥访问。
  2. 有界是指:ArrayBlockingQueue 对应的数组是有界限的。
  3. 阻塞:是指多线程访问竞争资源时,当竞争资源已被某线程获取时,其它要获取该资源的线程需要阻塞等待。
  4. 所谓公平的访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,先阻塞的线程先访问 ArrayBlockingQueue 队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才能够访问队列。然而为了保证公平性,通常会降低吞吐量。

LinkedBlockingQueue

LinkedBlockingQueue 是一个基于链表实现的阻塞队列,默认情况下,该阻塞队列的大小为 Integer.MAX_VALUE,由于这个数值特别大,因此在很多地方称 LinkedBlockingQueue 是一个无界队列。在 LinkedBlockingQueue 进行初始化时,可以手动指定队列的大小,这样 LinkedBlockingQueue 就是一个有界队列了。所以说 LinkedBlockingQueue 是一个可选择的有界队列。

LinkedBlockingQueue 采用 ReentrantLock 来控制并发。但是和 ArrayBlockingQueue 不同的是,LinkedBlockingQueue 对队头和队尾各自使用了一把锁来做并发控制。LinkedBlockingQueue 采用的是 wait-notify 机制实现的,不过没有用 Object 提供的,用 java.util.concurrent.locks.Condition 中的 await()和 signal()配合锁实现。

在 put()方法中, 关键点在于搞清楚下面俩点:

1
2
1. if (c + 1 < capacity) notFull.signal();
2. if (c == 0) signalNotEmpty();
  1. 语句 1 是判断在多线程的环境下起到作用, 假设现在有 4 个线程都在 await()处阻塞, take()取出一个数据, 现在唤醒了一个线程, 那么当该线程继续 put 的时候, 通过该判断, 如果队列非满则将阻塞在 await()出的线程继续唤醒, 直到队列满了或者全部唤醒。
  2. 语句 2 是由于前边是调用的 count.getAndIncrement(), 如果 c 为 0, 那么现在队列里就有了一个元素, 唤醒阻塞在出列的 await()处的线程, 可以继续出列, 取数据了.

LinkedBlockingQueue 用 Node 存储对象。

总结

  1. LinkedBlockingQueue 是通过锁分离的方式进行控制,减少了 take 和 put 之间的锁竞争。
  2. LinkedBlockingQueue 是通过链表的方式实现,所以进行锁分离时不会冲突,因为入队和出队分别作用于队尾和队首。
  3. 内部采用了原子操作类(CAS)进行控制链表长度。
  4. 入队后,如果之前队列为空时,会通知 take 方法,队列已有数据可进行 take,反之,出队后,队列之前已满,则通知 put 方法,队列已有空闲位置可进行 put 操作。

tip(wait-notify 机制)

从整体上来看 Object 的 wait 和 notify/notify 是与对象监视器配合完成线程间的等待/通知机制,而 Condition 与 Lock 配合完成等待通知机制,前者是 java 底层级别的,后者是语言级别的,具有更高的可控制性和扩展性。两者除了在使用方式上不同外,在功能特性上还是有很多的不同:

  1. Condition 能够支持不响应中断,而通过使用 Object 方式不支持;
  2. Condition 能够支持多个等待队列(new 多个 Condition 对象),而 Object 方式只能支持一个;
  3. Condition 能够支持超时时间的设置,而 Object 不支持。

Condition 详细原理参见TODO

源码定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;

/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();

/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;

/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
}

LinkedBlockingDeque

LinkedBlockingDeque 队列和 LinkedBlockingQueue 队列性质相似,不过 LinkedBlockingDeque 队列是双端队列,有 1 个 lock 和 2 个 condition

源码定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {

/**
* Pointer to first node.
* Invariant: (first == null && last == null) ||
* (first.prev == null && first.item != null)
*/
transient Node<E> first;

/**
* Pointer to last node.
* Invariant: (first == null && last == null) ||
* (last.next == null && last.item != null)
*/
transient Node<E> last;

/** Number of items in the deque */
private transient int count;

/** Maximum number of items in the deque */
private final int capacity;

/** Main lock guarding all access */
final ReentrantLock lock = new ReentrantLock();

/** Condition for waiting takes */
private final Condition notEmpty = lock.newCondition();

/** Condition for waiting puts */
private final Condition notFull = lock.newCondition();
}

ConcurrentLinkedQueue

它是一个采用双向链表实现的无界并发非阻塞队列,它属于 LinkedQueue 的安全版本。ConcurrentLinkedQueue 内部采用 CAS 操作保证线程安全,这是非阻塞队列实现的基础,相比 ArrayBlockingQueue、LinkedBlockingQueue 具备较高的性能。

ConcurrentLinkedQueue 使用约定:

  1. 不允许 null 入列
  2. 在入队的最后一个元素的 next 为 null
  3. 队列中所有未删除的节点的 item 都不能为 null 且都能从 head 节点遍历到
  4. 删除节点是将 item 设置为 null, 队列迭代时跳过 item 为 null 节点
  5. head 节点跟 tail 不一定指向头节点或尾节点,可能存在滞后性

注意

ConcurrentLinkedQueue 中的 tail 节点不一定是最后一个节点,他可能是倒数第二个。所以 ConcurrentLinkedQueue 判断队尾条件是节点的 next 为 null。

HOPS(延迟更新的策略)的设计

tail 和 head 是延迟更新的,两者更新触发时机为:

  • tail 更新触发时机:当 tail 指向的节点的下一个节点不为 null 的时候,会执行定位队列真正的队尾节点的操作,找到队尾节点后完成插入之后才会通过 casTail 进行 tail 更新;当 tail 指向的节点的下一个节点为 null 的时候,只插入节点不更新 tail。
  • head 更新触发时机:当 head 指向的节点的 item 域为 null 的时候,会执行定位队列真正的队头节点的操作,找到队头节点后完成删除之后才会通过 updateHead 进行 head 更新;当 head 指向的节点的 item 域不为 null 的时候,只删除节点不更新 head。

LinkedTransferQueue

LinkedTransferQueue 是在 JDK1.7 时,J.U.C 包新增的一种比较特殊的阻塞队列,它除了具备阻塞队列的常用功能外,还有一个比较特殊的 transfer 方法。

LinkedTransferQueue 提供了两种构造器,也没有参数设置队列初始容量,所以是一种无界队列

注意

LinkedTransferQueue 的单向链表中一定会有至少一个 Node 节点,既是 LinkedTransferQueue 队列集合通过默认的构造函数进行实例化时构建的“虚”节点,该节点的 isData 属性被标识为 true,并且和该节点 item 属性实际引用数据对象的情况冲突(item 属性为 null),这样一来无论 xfer 操作进行的是入队操作还是出队操作,这个虚拟节点都会被排除在操作逻辑以外。并且因为 q = p.next 和 p = q 两个操作语句的缘故,代表当前正在处理的 p 引用会向单向链表的后续结点移动。

常用方法

transfer 方法

用于将指定元素 e 传递给消费者线程(调用 take/poll 方法)。如果有消费者线程正在阻塞等待,则调用 transfer 方法的线程会直接将元素传递给它;如果没有消费者线程等待获取元素,则调用 transfer 方法的线程会将元素插入到队尾,然后阻塞等待,直到出现一个消费者线程获取元素。

在普通阻塞队列中,当队列为空时,消费者线程(调用 take 或 poll 方法的线程)一般会阻塞等待生产者线程往队列中存入元素。而 LinkedTransferQueue 的 transfer 方法则比较特殊:

  1. 当有消费者线程阻塞等待时,调用 transfer 方法的生产者线程不会将元素存入队列,而是直接将元素传递给消费者。
  2. 如果调用 transfer 方法的生产者线程发现没有正在等待的消费者线程,则会将元素入队,然后会阻塞等待,直到有一个消费者线程来获取该元素。

TransferQueue 还提供了两个变种方法:tryTransfer(E e)、tryTransfer(E e, long timeout, TimeUnit unit)。

tryTransfer(E e)

当生产者线程调用 tryTransfer 方法时,如果没有消费者等待接收元素,则会立即返回 false。该方法和 transfer 方法的区别就是 tryTransfer 方法无论消费者是否接收,方法立即返回,而 transfer 方法必须等到消费者消费后才返回。

tryTransfer(E e, long timeout, TimeUnit unit)

tryTransfer(E e,long timeout,TimeUnit unit)方法则是加上了限时等待功能,如果没有消费者消费该元素,则等待指定的时间再返回;如果超时还没消费元素,则返回 false,如果在超时时间内消费了元素,则返回 true。

xfer 方法

xfer 方法从字面上可以直译为“传送”,它是指通过多种操作模式,利用 LinkedTransferQueue 队列内置的单向链表,使数据对象在生产者和消费者间进行传递。JDK 9+开始,xfer 操作的逻辑做了一次较大的改造,处理逻辑变得更加高效。

xfer 方法是 LinkedTransferQueue 队列最核心的操作方法之一,其支撑了诸如 offer、add、put、transfer、tryTransfer、take、poll 等方法的内部实现。

xfer 方法参数

1
2
3
private E xfer(E e, boolean haveData, int how, long nanos) {
// ......
}
  1. e:该参数就是本次进行传输的数据对象,如果当前 xfer 方法被消费者线程端调用,则 e 为 null。
  2. haveData:该参数指示本次 xfer 方法的调用是否有数据对象通过上一个 e 参数进行传入,也就是说 e 和 haveData 这两个参数是配对使用的。当 e 为 null 时,haveData 应该为 false;反之当 e 不为 nul 时,haveData 应该为 true。
  3. how:本次 xfer 方法的操作模式。一共有四种: NOW, ASYNC, SYNC, TIMED。
  4. nanos:本次 xfer 方法的操作超时时间(单位纳秒),当本次操作的操作模式为 TIMED 时(限时/超时模式),需要通过该参数指定本次操作的超时时间。

xfer 工作模式

xfer 的调用方式主要分为四种工作模式,在 xfer 方法的入参中表现为四个不同的数值:

1
2
3
4
5
6
7
/*
* xfer方法的入参, 不同类型的方法内部调用xfer方法时入参不同.
*/
private static final int NOW = 0; // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer

这四个常量值,作为 xfer 方法的入参,用于标识不同操作类型。其实从常量的命名也可以看出它们对应的操作含义:

NOW

表示即时操作(可能失败),即不会阻塞调用线程:

  • poll(获取并移除队首元素,如果队列为空,直接返回 null);
  • tryTransfer(尝试将元素传递给消费者,如果没有等待的消费者,则立即返回 false,也不会将元素入队)
ASYNC

表示异步操作(必然成功):

  • offer(插入指定元素至队尾,由于是无界队列,所以会立即返回 true);
  • put(插入指定元素至队尾,由于是无界队列,所以会立即返回);add(插入指定元素至队尾,由于是无界队列,所以会立即返回 true)
SYNC

表示同步操作(阻塞调用线程):

  • transfer(阻塞直到出现一个消费者线程);
  • take(从队首移除一个元素,如果队列为空,则阻塞线程)
TIMED

表示限时同步操作(限时阻塞调用线程):

  • poll(long timeout, TimeUnit unit);
  • tryTransfer(E e, long timeout, TimeUnit unit)

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
// ......
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null)) {
throw new NullPointerException();
}
// 最外层的for循环,遵循cas操作思想,只要操作不符合预期,就不停的重新操作
// 直到操作结果符合预期为止
restart: for (Node s = null, t = null, h = null;;) {
// 这是初始化是决定当前p的位置是依据当前单向链表的head节点进行引用还是依据当前单向链表的tail节点进行引用
// 其本质判断是当前操作是入队操作还是出队操作
// 其最本质的判定是当前xfer操作的性质(haveData)和当前链表tail引用位置所描述的操作性质(t.isData)是否一致
// 如果操作性质一致,当前xfer操作就是从tail引用位置开始判定和进行的入队操作
// 如果操作性质不一致,当前xfer操作就是从head引用位置开始判定和进行的出队操作
for (Node p = (t != (t = tail) && t.isData == haveData) ? t : (h = head);; ) {
final Node q; final Object item;
// 出队操作的场景,其处理策略在此代码段落
// 只有当前处理节点p的isData标识和入参的haveData标识一致
// 且当前处理节点p真实的数据对象存在情况和入参的haveData标识一致
if (p.isData != haveData && haveData == ((item = p.item) == null)) {
// 将局部变量h引用与当前单向链表的head位置
// 避免在多线程情况下head引用被改变引起的处理错误
if (h == null) {
h = head;
}
// 对当前节点进行原子性赋值:
// 如果是生产者任务从队列中取出,那么赋值成功后,当前节点p的item属性将为e(不会为null)
// 如果是消费者任务从队列中取出,那么赋值成功后,当前节点p的item属性将为null
if (p.tryMatch(item, e)) {
// 在双跳队列进行数据取数操作时,当前处理节点p可能和h不一致,但一定是在h代表的节点“附近”
// 所以,如果条件成立,就要进行以h代表的节点为基准的链表清理操作
if (h != p) {
skipDeadNodesNearHead(h, p);
}
return (E) item;
}
}
// 入队操作的场景,其处理策略在此代码段落
// 加入队列的可能是消费者任务,也可能是生产者任务
// 根据之前对单向链表tail引用位置的描述,tail引用的位置不一定是单向链表的最后一个节点
// 所以首先将p节点移动到链表的最后一个节点,否则就不进行业务逻辑处理
if ((q = p.next) == null) {
// 操作方式为NOW的入队操作,将会被忽略
if (how == NOW) {
return e;
}
// 入队操作需要生成一个新的Node节点
if (s == null) {
s = new Node(e);
}
// 使用原子操作,将当前操作s结点引用到当前p结点的item属性
// 如果操作失败,说明p结点的next操作已经被其它线程中的操作所引用,
// 那么通过内层的for循环继续进行操作
if (!p.casNext(null, s)) {
continue;
}
// 当前p节点引用的位置和t节点引用的可能是单向链表tail处的位置可能不一样
// 引起这个的原因可能有很多:
// a、当前xfer操作在中为p节点关联next属性的操作:p.casNext(null, s)不停失败,
// 不停的在第二层for循环中做q = p.next 和 p == (p = q) 操作
// b、虽然xfer操作成功了,但是当前线程连续进行了两次xfer调用操作(不好理解?后文将进行图例化讲解)
if (p != t) {
casTail(t, s);
}
if (how == ASYNC) {
return e;
}
return awaitMatch(s, p, e, (how == TIMED), nanos);
}
// 让p引用指向当前节点的下一个节点
// 如果当前节点的next属性指向自己,说明当前节点已经被移除队列
// 按照cas的思路,本次xfer操作需要重来
if (p == (p = q)) {
continue restart;
}
}
}
}

// ......
/** Tries to CAS-match this node; if successful, wakes waiter. */
// 这是LinkedTransferQueue.Node类中的方法
// 方法尝试如果当前Node对象的item属性值为cmp的情况下,重新赋值为val
// 如果设置成功,则解除当前Node所代表的等待线程的阻塞状态
// 这个阻塞状态的线程可能是生产者,也可能是生产者。
final boolean tryMatch(Object cmp, Object val) {
if (casItem(cmp, val)) {
LockSupport.unpark(waiter);
return true;
}
return false;
}

// 这是LinkedTransferQueue.Node类中的方法
// 该方法尝试如果当前Node对象的item属性值为cmp的情况下,重新赋值为val,并返回true
// 否则就返回false
final boolean casItem(Object cmp, Object val) {
// assert isData == (cmp != null);
// assert isData == (val == null);
// assert !(cmp instanceof Node);
return ITEM.compareAndSet(this, cmp, val);
}
// ......

PriorityBlockingQueue

PriorityBlockingQueue 是一个基于堆的无界并发安全的优先级队列,每次出队都返回优先级最高的元素,数据结构是一个二叉树最小堆算法维护的数组,这个数组是可扩容的,直接遍历队列元素是无序的。

PriorityBlockingQueue 可以理解为 public 操作都加锁的 PriorityQueue,通过排他锁保证了操作的线程安全。PriorityBlockingQueue 扩容时,因为增加堆数组的长度并不影响队列中元素的出队操作,因而使用自旋 CAS 操作实现的锁来控制扩容操作,仅在数组引用替换和拷贝元素时才加锁,从而减少了扩容对出队操作的影响。

PriorityBlockingQueue 不允许 null 值,不允许未实现 Comparable 接口的对象。

PriorityQueue 特点

  1. PriorityQueue 是基于优先堆的一个无界队列,这个优先队列中的元素可以默认自然排序或者通过提供的 Comparator(比较器)在队列实例化的时排序。
  2. PriorityQueue 不允许 null 值,而且不支持 non-comparable(不可比较)的对象,比如用户自定义的类。优先队列要求使用 Java Comparable 和 Comparator 接口给对象排序,并且在排序时会按照优先级处理其中的元素。
  3. PriorityQueue 的头是基于自然排序或者 Comparator 排序的最小元素。如果有多个对象拥有同样的排序,那么就可能随机地取其中任意一个。当我们获取队列时,返回队列的头对象。
  4. PriorityQueue 的大小是不受限制的,所以put 永远不会被阻塞。但在创建时可以指定初始大小,当我们向优先队列增加元素的时候,队列大小会自动增加
  5. PriorityQueue 是非线程安全的,所以 Java 提供了 PriorityBlockingQueue(实现 BlockingQueue 接口)用于 Java 多线程环境。

DelayQueue

DelayQueue 是一个无界的 BlockingQueue,用于放置实现了 Delayed 接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最短

DelayQueue 不允许 null 值。

内部结构

  • 可重入锁
  • 用于根据 delay 时间排序的优先级队列
  • 用于优化阻塞通知的线程元素 leader
  • 用于实现阻塞和通知的 Condition 对象

SynchronousQueue

经典的生产者-消费者模式,操作流程是这样的:

  • 有多个生产者,可以并发生产产品,把产品置入队列中,如果队列满了,生产者就会阻塞;
  • 有多个消费者,并发从队列中获取产品,如果队列空了,消费者就会阻塞;

SynchronousQueue 也是一个队列来的,但它的特别之处在于它内部没有容器,一个生产线程,当它生产产品(即 put 的时候),如果当前没有人想要消费产品(即当前没有线程执行 take),此生产线程必须阻塞,等待一个消费线程调用 take 操作,take 操作将会唤醒该生产线程,同时消费线程会获取生产线程的产品(即数据传递)

  1. SynchronousQueue 没有容量。与其他 BlockingQueue 不同,SynchronousQueue 是一个不存储元素的 BlockingQueue。每一个 put 操作必须要等待一个 take 操作,否则不能继续添加元素,反之亦然。
  2. 因为没有容量,所以对应 peek, contains, clear, isEmpty … 等方法其实是无效的。例如 clear 是不执行任何操作的,contains 始终返回 false,peek 始终返回 null。
  3. SynchronousQueue 分为公平和非公平,默认情况下采用非公平性访问策略,当然也可以通过构造函数来设置为公平性访问策略(为 true 即可)。
  4. SynchronousQueue 底层有两种数据结构:队列(TransferQueue,实现公平策略)和栈(TransferStack,实现非公平策略)
  5. 若使用 TransferQueue, 则队列中永远会存在一个 dummy node。

参考资料

  1. 高性能队列——Disruptor
  2. 详解 Condition 的 await 和 signal 等待/通知机制
  3. LinkedBlockingQueue 原理详解
  4. SynchronousQueue 原理解析
  5. JUC 集合: ConcurrentLinkedQueue 详解
  6. 源码阅读(39):Java 中线程安全的 Queue、Deque 结构——LinkedTransferQueue(2)
  7. DelayQueue 详解