JCTools - 增强的并发工具

JCTools 简介

早在 96 年就有论文提出了无锁队列的概念,再到后来 Disruptor,高性能已得到生产的验证。此处介绍的 Jctools 中的高性能队列,其性能丝毫不输于 Disruptor。

JCTools (Java Concurrency Tools) 提供了一系列非阻塞并发数据结构(标准 Java 中缺失的),当存在线程争抢的时候,非阻塞并发数据结构比阻塞并发数据结构能提供更好的性能。

JCTools 是一个开源工具包,在 Apache License 2.0 下发布,并在 Netty、Rxjava 等诸多框架中被广泛使用。

JCTools 的开源 Github 仓库:https://github.com/JCTools/JCTools

在 Maven 中引入 JCtools jar 包就能使用 JCTools 了:

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/org.jctools/jctools-core -->
<dependency>
<groupId>org.jctools</groupId>
<artifactId>jctools-core</artifactId>
<version>4.0.1</version>
</dependency>

Netty 中直接引入了 JCTools 的 Mpsc Queue,Caffeine 中引入了 JCTools 的 Mpsc Queue,复制了其中的代码,然后简单改了下。

JCTools 中主要提供了 Map 以及 Queue 的非阻塞并发数据结构

非阻塞 Map

  • ConcurrentAutoTable(后面几个 map/set 结构的基础)
  • NonBlockingHashMap
  • NonBlockingHashMapLong
  • NonBlockingHashSet
  • NonBlockingIdentityHashMap
  • NonBlockingSetInt

NonBlockingHashMap 是对 ConcurrentHashMap 的增强,对多 CPU 的支持以及高并发更新提供更好的性能。

NonBlockingHashMapLong 是 key 为 Long 型的 NonBlockingHashMap。

NonBlockingHashSet 是对 NonBlockingHashMap 的简单包装以支持 set 的接口。

NonBlockingIdentityHashMap 是从 NonBlockingHashMap 改造来的,使用 System.identityHashCode() 来计算哈希。

NonBlockingSetInt 是一个使用 CAS 的简单的 bit-vector。

非阻塞 Queue

JCTools 提供的非阻塞队列分为 4 类,可以根据不同的应用场景选择使用:

  • SPSC-单一生产者单一消费者(有界和无界)
  • MPSC-多生产者单一消费者(有界和无界)
  • SPMC-单生产者多消费者(有界)
  • MPMC-多生产者多消费者(有界)

这里解释一下 MSPC 的含义,如下:

  • M:Multiple,多个的
  • S:Single,单个的
  • P:Producer,生产者
  • C:Consumer,消费者

“生产者”和“消费者”是指“生产线程”和“消费线程”。

1
2
3
4
5
6
7
8
9
10
11
// spsc-有界/无界队列
Queue<String> spscArrayQueue = new SpscArrayQueue(16);
Queue<String> spscUnboundedArrayQueue = new SpscUnboundedArrayQueue(2);
// spmc-有界队列
Queue<String> spmcArrayQueue = new SpmcArrayQueue<>(16);
// mpsc-有界/无界队列
Queue<String> mpscArrayQueue = new MpscArrayQueue<>(16);
Queue<String> mpscChunkedArrayQueue = new MpscChunkedArrayQueue<>(1024, 8 * 1024);
Queue<String> mpscUnboundedArrayQueue = new MpscUnboundedArrayQueue<>(2);
// mpmc-有界队列
Queue<String> mpmcArrayQueue = new MpmcArrayQueue<>(16);

JCTools 是一款对 jdk 并发数据结构进行增强的并发工具,主要提供了 map 以及 queue 的增强数据结构。原来 netty 还是自己写的 MpscLinkedQueueNode,后来新版本就换成使用 JCTools 的并发队列了。

Mpsc Queue

多生产者单消费者的使用场景,最佳的案例有:

  • Netty Reactor 线程中任务队列 taskQueue 必须满足多个生产者可以同时提交任务
  • Caffeine 中多个生产者业务线程,去进行缓存写入操作, 而只有单个的维护线程,基于 W-TinyLRU 进行访问记录的维护

所以 JCTools 提供的 Mpsc Queue 非常适合:

  • Netty 异步任务场景
  • Caffeine 的写入操作访问记录维护场景

Mpsc Queue 有多种的实现类,例如 MpscArrayQueue, MpscUnboundedArrayQueue, MpscChunkedArrayQueue 等。

MpscArrayQueue

MpscArrayQueue 的继承关系如下图:

queue-jctools-MpscArrayQueue-1

从上图我们可以看到每个有包含属性的类后面都会被 MpscXxxPad 类隔开

这是 jctools 中用到的缓存行填充技术

缓存行填充解决了伪共享(False Sharing)问题,提高多线程并发访问共享变量时的性能。

在现代计算机系统中,CPU 缓存以缓存行(Cache Line)为单位进行数据加载和存储。缓存行是一个固定大小的数据块,在 x86 架构中通常为 64 字节。当一个线程访问一个共享变量时,它会加载所在缓存行的数据到 CPU 缓存中。如果多个线程同时访问位于同一缓存行的不同变量,会导致伪共享问题。

伪共享问题的产生是由于多个线程的缓存行之间存在数据竞争,即它们共享同一缓存行,但实际上它们操作的变量是不同的。由于缓存一致性协议的限制,一个线程对共享变量的修改会导致整个缓存行的数据无效,从而迫使其他线程重新加载整个缓存行的数据,这会导致严重的性能下降。

缓存行填充的作用是通过在变量之间插入空白字段,使得共享变量被放置在不同的缓存行中,从而避免了多个线程之间对同一缓存行的竞争。这样,即使多个线程同时访问不同的共享变量,它们的操作不会影响彼此,也不会导致缓存行失效,从而显著提高了并发访问性能。

缓存行填充详细原因及原理: 对齐填充的应用

MpscUnboundedArrayQueue

Caffeine 和 Netty,所使用的并不是原始的 MpscArrayQueue ,而是其高性能的子类 MpscGrowableArrayQueue ,来看看其数据结构。

MpscGrowableArrayQueue 是基于数组 + 链表的复合结构。

基于数组 + 链表的复合结构的优势:

  1. 不会像链表那样分配过多的 Node,吞吐量比传统的链表高
  2. 扩容的时候,也不存在数组复制,扩容的速度,也比传统的数组快

源码分析

入队 offer 的源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
abstract class BaseMpscLinkedArrayQueue<E> extends BaseMpscLinkedArrayQueueColdProducerFields<E>
implements MessagePassingQueue<E>, QueueProgressIndicators
{
@Override
public boolean offer(final E e)
{
if (null == e)
{
throw new NullPointerException();
}

long mask;
E[] buffer; // 生产者指向的数组
long pIndex; // 生产索引

while (true)
{
// 获取生产者索引最大限制
long producerLimit = lvProducerLimit();
// 获取生产者索引
pIndex = lvProducerIndex();
// 生产索引以 2 为步长递增,
// 第 0 位标识为 resize ,所以非扩容场景,不会是奇数,
// 扩容的时候,会在 offerSlowPath() 中扩容线程会将其设为奇数
// lower bit is indicative of resize, if we see it we spin until it's cleared
if ((pIndex & 1) == 1)
{
// 奇数代表正在扩容,自旋,等待扩容完成
continue;
}
// pIndex is even (lower bit is 0) -> actual index is (pIndex >> 1)
// pIndex 是偶数, 实际的索引值需要除以 2

// mask/buffer may get changed by resizing -> only use for array access after successful CAS.
mask = this.producerMask;
buffer = this.producerBuffer;
// a successful CAS ties the ordering, lv(pIndex) - [mask/buffer] -> cas(pIndex)

// assumption behind this optimization is that queue is almost always empty or near empty
// 阈值 producerLimit 小于等于生产者指针位置 pIndex, 需要扩容
if (producerLimit <= pIndex)
{
// 通过 offerSlowPath 返回状态值,来查看怎么来处理这个待添加的元素
int result = offerSlowPath(mask, pIndex, producerLimit);
switch (result)
{
// producerLimit 虽然达到了 limit,
// 但是当前数组已经被消费了部分数据,暂时不会扩容,会使用已被消费的槽位
case CONTINUE_TO_P_INDEX_CAS:
break;
// 可能由于并发原因导致 CAS 失败,那么则再次重新尝试添加元素
case RETRY:
continue;
// 队列已满,直接返回false操作
case QUEUE_FULL:
return false;
// 队列需要扩容操作
case QUEUE_RESIZE:
// 对队列进行直接扩容操作
resize(mask, buffer, pIndex, e, null);
return true;
}
}

// 运行到这里,阈值 producerLimit 一定大于生产者指针位置 pIndex
// 直接通过 CAS 操作对 pIndex 做加 2 处理
if (casProducerIndex(pIndex, pIndex + 2))
{
break;
}
}

// 有序写入数组元素
// INDEX visible before ELEMENT
final long offset = modifiedCalcCircularRefElementOffset(pIndex, mask);
// 将 buffer 数组的指定位置替换为 e,
// 不是根据下标来设置的,是根据槽位的地址偏移量 offset, UNSAFE 操作
soRefElement(buffer, offset, e); // release element e
return true;
}
}

从上面源码可以看出 offer 是有序写入数组元素

offer 写入的时候,jctool 根据 pIndex 进行位运算计算得到数组对应的下标,然后通过 soRefElement 方法将数据写入到数组中

MpscUnboundedArrayQueue 的 offer(final E e) 是调用的父类 BaseMpscLinkedArrayQueue 的 offer(final E e) 方法

offerSlowPath()

offerSlowPath() 会告诉线程队列是满了,还是需要扩容,还是需要自旋重试。

总之,这个一条慢路径,所以叫做 slow path。

虽然 producerIndex 达到了 producerLimit ,但不代表队列就非得扩容。

如果消费者已经消费了部分生产者指向的数组元素,就意味这当前数组还是有槽位可以继续用的,暂时不用扩容。

下面是 offerSlowPath() 方法源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
abstract class BaseMpscLinkedArrayQueue<E> extends BaseMpscLinkedArrayQueueColdProducerFields<E>
implements MessagePassingQueue<E>, QueueProgressIndicators
{
/**
* We do not inline resize into this method because we do not resize on fill.
*/
private int offerSlowPath(long mask, long pIndex, long producerLimit)
{
// 消费者索引
final long cIndex = lvConsumerIndex();
// 数组缓冲的容量,(长度-1) * 2
long bufferCapacity = getCurrentBufferCapacity(mask);

// 消费索引 + 当前数组的容量 > 生产索引,代表当前数组已有部分元素被消费了,
// 所以不会扩容,会使用已被消费的槽位。
if (cIndex + bufferCapacity > pIndex)
{
if (!casProducerLimit(producerLimit, cIndex + bufferCapacity))
{
// CAS失败,自旋重试
// retry from top
return RETRY;
}
else
{
// 重试用 CAS 方式修改生产索引
// continue to pIndex CAS
return CONTINUE_TO_P_INDEX_CAS;
}
}
// 根据生产者和消费者索引判断 Queue 是否已满,无界队列永不会满
// full and cannot grow
else if (availableInQueue(pIndex, cIndex) <= 0)
{
// offer should return false;
return QUEUE_FULL;
}
// 用 CAS 的方式将 producerIndex 加 1, 奇数代表正在 resize
// grab index for resize -> set lower bit
else if (casProducerIndex(pIndex, pIndex + 1))
{
// trigger a resize
return QUEUE_RESIZE;
}
else
{
// failed resize attempt, retry from top
return RETRY;
}
}
}

如果需要扩容,线程会 CAS 操作将 producerIndex 改为奇数,让其它线程能感知到队列正在扩容,要生产数据的线程先自旋,等待扩容完成再继续操作。

offer() 过程中,通过 CAS 抢槽位,CAS 失败的线程自旋重试。

如果遇到队列需要扩容,则将 producerIndex 设为奇数,其他线程自旋等待,一直等到扩容完成,扩容后再设为偶数,通知其它线程继续生产。

resize()

resize() 是扩容的核心方法,

它首先会创建一个相同长度的新数组,将 producerBuffer 指向新数组,然后将元素 e 放到新数组中,

旧元素的最后一个元素指向新数组,形成链表。

还会将旧元素的槽位填充 JUMP 元素,代表队列扩容了。

下面是 resize() 方法源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
abstract class BaseMpscLinkedArrayQueue<E> extends BaseMpscLinkedArrayQueueColdProducerFields<E>
implements MessagePassingQueue<E>, QueueProgressIndicators
{
/**
* 扩容:
* 新建一个 E[], 将 oldBuffer 和 newBuffer 建立连接。
* 将 producerBuffer 指向新数组,然后将元素 e 放到新数组中,
* 旧元素的最后一个元素指向新数组,形成链表。
* 还会将旧元素的槽位填充 JUMP 元素,代表队列扩容了。
*/
private void resize(long oldMask, E[] oldBuffer, long pIndex, E e, Supplier<E> s)
{
assert (e != null && s == null) || (e == null || s != null);
// 下一个 Buffer 的长度, MpscQueue 会构建一个相同长度的 Buffer
int newBufferLength = getNextBufferSize(oldBuffer);
final E[] newBuffer;
try
{
// 创建一个新的 E[]
newBuffer = allocateRefArray(newBufferLength);
}
catch (OutOfMemoryError oom)
{
assert lvProducerIndex() == pIndex + 1;
soProducerIndex(pIndex);
throw oom;
}

// 生产者 Buffer 指向新的 E[]
producerBuffer = newBuffer;
// 计算新的 mask, Buffer 长度不变的情况下, mask 也不变
final int newMask = (newBufferLength - 2) << 1;
producerMask = newMask;

// 根据该偏移量设置 oldBuffer 的 JUMP 元素,会递增然后重置,不断循环
final long offsetInOld = modifiedCalcCircularRefElementOffset(pIndex, oldMask);
// mask 不变的情况下, oldBuffer 的 JUMP 对应的位置,就是 newBuffer 中要消费的位置
final long offsetInNew = modifiedCalcCircularRefElementOffset(pIndex, newMask);

// 元素 e 放到新数组中
soRefElement(newBuffer, offsetInNew, e == null ? s.get() : e);// element in new array
// 旧数组和新数组建立连接,旧数组的最后一个元素保存新数组的地址
soRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);// buffer linked

// ASSERT code
final long cIndex = lvConsumerIndex();
final long availableInQueue = availableInQueue(pIndex, cIndex);
RangeUtil.checkPositive(availableInQueue, "availableInQueue");

// Invalidate racing CASs
// We never set the limit beyond the bounds of a buffer
soProducerLimit(pIndex + Math.min(newMask, availableInQueue));

// make resize visible to the other producers
soProducerIndex(pIndex + 2);

// INDEX visible before ELEMENT, consistent with consumer expectation

// make resize visible to consumer
soRefElement(oldBuffer, offsetInOld, JUMP);
}
}

出队 poll 的源码分析

poll() 方法核心思路是获取消费者索引 consumerIndex ,然后根据 consumerIndex 计算得出数组对应的偏移量,然后将数组对应位置的元素取出并返回,最后将 consumerIndex 移动到环形数组下一个位置。

如果元素为 null,并不代表队列是空的,还要比较 consumerIndex 和 producerIndex ,如果两者索引不同,那么 producerIndex 肯定是大于 consumerIndex 的,说明生产者已经在生产了,移动了 producerIndex ,只是还没来得及将元素填充到数组而已。

因为生产者是先 CAS 递增 producerIndex,再将元素填充到数组的,两步之间存在一个非常短的时间差,如果消费者恰好在这个时间差内去消费数据,那么就自旋等待一下,等待生产者填充元素到数组。

如果元素为 JUMP ,说明队列扩容了,消费者需要根据数组的最后一个元素找到扩容后的新数组,消费新数组的元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
abstract class BaseMpscLinkedArrayQueue<E> extends BaseMpscLinkedArrayQueueColdProducerFields<E>
implements MessagePassingQueue<E>, QueueProgressIndicators
{
/**
* {@inheritDoc}
* <p>
* This implementation is correct for single consumer thread use only.
* poll() 没有做并发控制, MpscQueue 是多生产单消费者的 Queue ,只有一个消费者,这个也是 netty 换成 MSPCQueue 的主要原因
*/
@SuppressWarnings("unchecked")
@Override
public E poll()
{
final E[] buffer = consumerBuffer;
final long cIndex = lpConsumerIndex();
final long mask = consumerMask;

final long offset = modifiedCalcCircularRefElementOffset(cIndex, mask);
Object e = lvRefElement(buffer, offset);
if (e == null)
{
long pIndex = lvProducerIndex();
// isEmpty?
// 元素已经消费完
if ((cIndex - pIndex) / 2 == 0)
{
return null;
}
// offer() 时生产者先 CAS 改 producerIndex,再设置元素,中间会有一个时间差,此时会自旋,等待元素设置完成
// poll() == null iff queue is empty, null element is not strong enough indicator, so we must
// spin until element is visible.
do
{
e = lvRefElement(buffer, offset);
}
while (e == null);
}

// 队列扩容了
if (e == JUMP)
{
// 获取下一个数组
final E[] nextBuffer = nextBuffer(buffer, mask);
// 从下一个数组中消费
return newBufferPoll(nextBuffer, cIndex);
}

// 取出元素后,将原来的槽位设为 null
soRefElement(buffer, offset, null); // release element null
// 递增 consumerIndex
soConsumerIndex(cIndex + 2); // release cIndex
return (E) e;
}
}
nextBuffer()

如果队列扩容了,nextBuffer() 会找到扩容后的新数组,同时它还会将旧数组的最后一个元素设为 BUFFER_CONSUMED,代表当前数组已经被消费完了,也就从链表中剔除了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
abstract class BaseMpscLinkedArrayQueue<E> extends BaseMpscLinkedArrayQueueColdProducerFields<E>
implements MessagePassingQueue<E>, QueueProgressIndicators
{
private E[] nextBuffer(final E[] buffer, final long mask)
{
// 通过当前数组的最后一个元素,获取下一个待消费的数组,
// 将当前数组最后一个元素设为 BUFFER_CONSUMED,代表当前数组已经消费完毕。
final long offset = nextArrayOffset(mask);
final E[] nextBuffer = (E[]) lvRefElement(buffer, offset);
consumerBuffer = nextBuffer;
consumerMask = (length(nextBuffer) - 2) << 1;
soRefElement(buffer, offset, BUFFER_CONSUMED);
return nextBuffer;
}
}

MpscArrayQueue 的优点总结

MpscQueue 由一系列数组构成,数组的最后一个元素指向下一个数组,形成单向链表。

MpscQueue 全程无锁化,非阻塞,相较于 JDK 提供的同步阻塞队列,性能有很好的提升,这也是 Netty 后来的版本将任务队列替换为 JCtools 的重要原因。

在生产过程中,它用到了大量的 CAS 操作,对于需要做并发控制的地方,确保只有一个线程会执行成功,其他 CAS 失败的线程会自旋重试,全程都是无锁非阻塞的。

不管是扩容,还是等待元素被填充到数组,这些过程都是会极快完成的,因此短暂的自旋会比线程挂起再唤醒效率更高。 数组扩容后会在原槽位填充 JUMP 元素,消费者遇到该元素就知道要寻找新数组继续消费了。

  • 通过 cacheline padding 解决核心属性的伪共享问题。
  • 数组的容量设置为 2 的次幂,可以通过位运算快速定位到数组对应下标。
  • 入队操作通过 CAS 无锁编程实现,并且通过链式数组结构,和数组节点的动态增加,解决扩容时的元素复制的问题,完成数组的快速扩容,减少 CAS 空自旋。
  • 在消费者 poll 过程中,因为只有一个消费者线程,所以整个 poll() 的过程没有 CAS 操作。
  • 通过有序的写入元素,去掉 volatile 的 StoreLoad 屏障,实现纳米级别的写入。 当然, 读取的时候,如果间隔太短,需要进行短时间的自旋。

这个是非常高性能的。 这也是 Netty、Caffeine 使用 mpsc 队列的原因。

ConcurrentAutoTable

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public class ConcurrentAutoTable implements Serializable {

// --- public interface ---

/**
* Add the given value to current counter value. Concurrent updates will
* not be lost, but addAndGet or getAndAdd are not implemented because the
* total counter value (i.e., {@link #get}) is not atomically updated.
* Updates are striped across an array of counters to avoid cache contention
* and has been tested with performance scaling linearly up to 768 CPUs.
*/
public void add( long x ) { add_if( x); }
/** {@link #add} with -1 */
public void decrement() { add_if(-1L); }
/** {@link #add} with +1 */
public void increment() { add_if( 1L); }

/** Atomically set the sum of the striped counters to specified value.
* Rather more expensive than a simple store, in order to remain atomic.
*/
public void set( long x ) {
CAT newcat = new CAT(null,4,x);
// Spin until CAS works
while( !CAS_cat(_cat,newcat) ) {/*empty*/}
}

/**
* Current value of the counter. Since other threads are updating furiously
* the value is only approximate, but it includes all counts made by the
* current thread. Requires a pass over the internally striped counters.
*/
public long get() { return _cat.sum(); }
/** Same as {@link #get}, included for completeness. */
public int intValue() { return (int)_cat.sum(); }
/** Same as {@link #get}, included for completeness. */
public long longValue() { return _cat.sum(); }

/**
* A cheaper {@link #get}. Updated only once/millisecond, but as fast as a
* simple load instruction when not updating.
*/
public long estimate_get( ) { return _cat.estimate_sum(); }

/**
* Return the counter's {@code long} value converted to a string.
*/
public String toString() { return _cat.toString(); }

/**
* A more verbose print than {@link #toString}, showing internal structure.
* Useful for debugging.
*/
public void print() { _cat.print(); }

/**
* Return the internal counter striping factor. Useful for diagnosing
* performance problems.
*/
public int internal_size() { return _cat._t.length; }

// Only add 'x' to some slot in table, hinted at by 'hash'. The sum can
// overflow. Value is CAS'd so no counts are lost. The CAS is retried until
// it succeeds. Returned value is the old value.
private long add_if( long x ) { return _cat.add_if(x,hash(),this); }

// The underlying array of concurrently updated long counters
private volatile CAT _cat = new CAT(null,16/*Start Small, Think Big!*/,0L);
private static AtomicReferenceFieldUpdater<ConcurrentAutoTable,CAT> _catUpdater =
AtomicReferenceFieldUpdater.newUpdater(ConcurrentAutoTable.class,CAT.class, "_cat");
private boolean CAS_cat( CAT oldcat, CAT newcat ) { return _catUpdater.compareAndSet(this,oldcat,newcat); }

// Hash spreader
private static int hash() {
//int h = (int)Thread.currentThread().getId();
int h = System.identityHashCode(Thread.currentThread());
return h<<3; // Pad out cache lines. The goal is to avoid cache-line contention
}
}

参考资料

  1. JCTools 简介-增强的并发工具
  2. 超高性能无锁队列 Mpsc Queue