disruptor笔记

Disruptor 简介

Disruptor 是一个开源的高性能队列框架,由英国外汇交易公司 LMAX 开发。研发的初衷是为了解决内存队列的延迟问题,能够在无锁的情况下实现队列的并发操作,基于 Disruptor 开发的系统单线程能支撑每秒 600 万订单。

队列的特性

先进先出(FIFO),生产者往队列里发布事件,消费者获得通知消费事件;如果队列中没有事件时,消费者堵塞,直到生产者发布了新事件。

Java 内置队列

队列 有界性 阻塞 数据结构
ArrayBlockingQueue bounded 加锁 arraylist
LinkedBlockingQueue optionally-bounded 加锁 linkedlist
ConcurrentLinkedQueue unbounded CAS linkedlist
LinkedTransferQueue unbounded CAS linkedlist
PriorityBlockingQueue unbounded 加锁 heap
DelayQueue unbounded 加锁 heap
SynchronizedQueue synchronized CAS linkedlist

队列的底层一般分成三种:数组、链表和堆。其中,堆一般情况下是为了实现带有优先级特性的队列,暂且不考虑。

我们就从数组和链表两种数据结构来看,基于数组线程安全的队列,比较典型的是 ArrayBlockingQueue,它主要通过加锁的方式来保证线程安全;基于链表的线程安全队列分成 LinkedBlockingQueue 和 ConcurrentLinkedQueue 两大类,前者也通过锁的方式来实现线程安全,而后者以及上面表格中的 LinkedTransferQueue 都是通过原子变量 compare and swap(以下简称“CAS”)这种不加锁的方式来实现的。

使用 CAS 协议实现的队列都是无界的(无法保证队列的长度在确定的范围内),理论上来说可以是无限扩展,那么如果生产者生产过快,消费者还没来得及消费,最终可能会导致内存溢出,影响系统稳定;而加锁的方式,可以实现有界队列。在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列;同时,为了减少 Java 的垃圾回收对系统性能的影响,会尽量选择 array/heap 格式的数据结构。这样筛选下来,符合条件的队列就只有 ArrayBlockingQueue。

更多 java 原生队列相关知识,参见: [java原生队列]

队列加锁性能

现实编程过程中,加锁通常会严重地影响性能。线程会因为竞争不到锁而被挂起,等锁被释放的时候,线程又会被恢复,这个过程中存在着很大的开销,并且通常会有较长时间的中断,因为当一个线程正在等待锁时,它不能做任何其他事情。如果一个线程在持有锁的情况下被延迟执行,例如发生了缺页错误、调度延迟或者其它类似情况,那么所有需要这个锁的线程都无法执行下去。如果被阻塞线程的优先级较高,而持有锁的线程优先级较低,就会发生优先级反转。

Disruptor 论文中讲述了一个实验:

  • 这个测试程序调用了一个函数,该函数会对一个 64 位的计数器循环自增 5 亿次。
  • 机器环境:2.4G 6 核
  • 运算: 64 位的计数器累加 5 亿次
Method Time (ms)
Single thread 300
Single thread with CAS 5,700
Single thread with lock 10,000
Single thread with volatile write 4,700
Two threads with CAS 30,000
Two threads with lock 224,000

CAS 操作比单线程无锁慢了 1 个数量级;有锁且多线程并发的情况下,速度比单线程无锁慢 3 个数量级。可见无锁速度最快。

单线程情况下,不加锁的性能 > CAS 操作的性能 > 加锁的性能。

多线程情况下,为了保证线程安全,必须使用 CAS 或锁,这种情况下,CAS 的性能超过锁的性能,前者大约是后者的 8 倍。

综上可知,加锁的性能是最差的。

Disruptor 设计方案

Disruptor 通过以下设计来解决队列速度慢的问题:

  1. 环形数组结构
    为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。
  2. 元素位置定位
    数组长度 2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心 index 溢出的问题。index 是 long 类型,即使 100 万 QPS 的处理速度,也需要 30 万年才能用完。
  3. 无锁设计
    每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。
  4. 缓存行填充
    缓存行填充解决了伪共享(False Sharing)问题,提高多线程并发访问共享变量时的性能。
    缓存行填充原因及原理: 对齐填充的应用

Disruptor 主要实现类

  1. Disruptor:Disruptor 的入口,主要封装了环形队列 RingBuffer、消费者集合 ConsumerRepository 的引用;主要提供了获取环形队列、添加消费者、生产者向 RingBuffer 中添加事件(可以理解为生产者生产数据)的操作;
  2. RingBuffer:Disruptor 中队列具体的实现,底层封装了 Object[]数组;在初始化时,会使用 Event 事件对数组进行填充,填充的大小就是 bufferSize 设置的值;此外,该对象内部还维护了 Sequencer(序列生产器)具体的实现;
  3. Sequencer:序列生产器,分别有 MultiProducerSequencer(多生产者序列生产器) 和 SingleProducerSequencer(单生产者序列生产器)两个实现类。上面的例子中,使用的是 SingleProducerSequencer;在 Sequencer 中,维护了消费者的 Sequence(序列对象)和生产者自己的 Sequence(序列对象);以及维护了生产者与消费者序列冲突时候的等待策略 WaitStrategy;
  4. Sequence:序列对象,内部维护了一个 long 型的 value,这个序列指向了 RingBuffer 中 Object[] 数组具体的角标。生产者和消费者各自维护自己的 Sequence;但都是指向 RingBuffer 的 Object[] 数组;
  5. Wait Strategy:等待策略。当没有可消费的事件时,消费者根据特定的策略进行等待;当没有可生产的地方时,生产者根据特定的策略进行等待;
  6. Event:事件对象,就是我们 Ringbuffer 中存在的数据,在 Disruptor 中用 Event 来定义数据,并不存在 Event 类,它只是一个定义;
  7. EventProcessor:事件处理器,单独在一个线程内执行,判断消费者的序列和生产者序列关系,决定是否调用我们自定义的事件处理器,也就是是否可以进行消费;
  8. EventHandler:事件处理器,由用户自定义实现,也就是最终的事件消费者,需要实现 EventHandler 接口;
  9. Producer:事件生产者;

无锁设计实现原理

下面忽略数组的环形结构,介绍一下如何实现无锁设计。整个过程通过原子变量 CAS,保证操作的线程安全。

多生产者

多个生产者的情况下,会遇到“如何防止多个线程重复写同一个元素”的问题。Disruptor 的解决方法是,每个线程获取不同的一段数组空间进行操作。这个通过 CAS 很容易达到。只需要在分配元素的时候,通过 CAS 判断一下这段空间是否已经分配出去即可。

但是会遇到一个新问题:如何防止读取的时候,读到还未写的元素。Disruptor 在多个生产者的情况下,引入了一个与 Ring Buffer 大小相同的 buffer:available Buffer。当某个位置写入成功的时候,便把 availble Buffer 相应的位置置位,标记为写入成功。读取的时候,会遍历 available Buffer,来判断元素是否已经就绪。

下面分读数据和写数据两种情况介绍。

读数据

  1. 申请读取到序号 n;
  2. 若 writer cursor >= n,这时仍然无法确定连续可读的最大下标。从 reader cursor 开始读取 available Buffer,一直查到第一个不可用的元素,然后返回最大连续可读元素的位置;
  3. 消费者读取元素。

如下图所示,读线程读到下标为 2 的元素,三个线程 Writer1/Writer2/Writer3 正在向 RingBuffer 相应位置写数据,写线程被分配到的最大元素下标是 11。

读线程申请读取到下标从 3 到 11 的元素,判断 writer cursor>=11。然后开始读取 availableBuffer,从 3 开始,往后读取,发现下标为 7 的元素没有生产成功,于是 WaitFor(11)返回 6。

然后,消费者读取下标从 3 到 6 共计 4 个元素。

多个生产者情况下,消费者消费过程示意图

写数据

  1. 申请写入 m 个元素;
  2. 若是有 m 个元素可以写入,则返回最大的序列号。每个生产者会被分配一段独享的空间;
  3. 生产者写入元素,写入元素的同时设置 available Buffer 里面相应的位置,以标记自己哪些位置是已经写入成功的。

如下图所示,Writer1 和 Writer2 两个线程写入数组,都申请可写的数组空间。Writer1 被分配了下标 3 到下表 5 的空间,Writer2 被分配了下标 6 到下标 9 的空间。

Writer1 写入下标 3 位置的元素,同时把 available Buffer 相应位置置位,标记已经写入成功,往后移一位,开始写下标 4 位置的元素。Writer2 同样的方式。最终都写入完成。

多个生产者情况下,生产者生产过程示意图

等待策略

生产者等待策略

暂时只有休眠 1ns。

1
LockSupport.parkNanos(1);

消费者等待策略

名称 措施 适用场景
BlockingWaitStrategy 加锁 CPU 资源紧缺,吞吐量和延迟并不重要的场景
BusySpinWaitStrategy 自旋 通过不断重试,减少切换线程导致的系统调用,而降低延迟。推荐在线程绑定到固定的 CPU 的场景下使用
PhasedBackoffWaitStrategy 自旋 + yield + 自定义策略 CPU 资源紧缺,吞吐量和延迟并不重要的场景
SleepingWaitStrategy 自旋 + yield + sleep 性能和 CPU 资源之间有很好的折中。延迟不均匀
TimeoutBlockingWaitStrategy 加锁,有超时限制 CPU 资源紧缺,吞吐量和延迟并不重要的场景
YieldingWaitStrategy 自旋 + yield + 自旋 性能和 CPU 资源之间有很好的折中。延迟比较均匀

缓存行填充

缓存行填充解决了伪共享(False Sharing)问题,提高多线程并发访问共享变量时的性能。

在现代计算机系统中,CPU 缓存以缓存行(Cache Line)为单位进行数据加载和存储。缓存行是一个固定大小的数据块,在 x86 架构中通常为 64 字节。当一个线程访问一个共享变量时,它会加载所在缓存行的数据到 CPU 缓存中。如果多个线程同时访问位于同一缓存行的不同变量,会导致伪共享问题。

伪共享问题的产生是由于多个线程的缓存行之间存在数据竞争,即它们共享同一缓存行,但实际上它们操作的变量是不同的。由于缓存一致性协议的限制,一个线程对共享变量的修改会导致整个缓存行的数据无效,从而迫使其他线程重新加载整个缓存行的数据,这会导致严重的性能下降。

缓存行填充的作用是通过在变量之间插入空白字段,使得共享变量被放置在不同的缓存行中,从而避免了多个线程之间对同一缓存行的竞争。这样,即使多个线程同时访问不同的共享变量,它们的操作不会影响彼此,也不会导致缓存行失效,从而显著提高了并发访问性能。

缓存行填充详细原因及原理: 对齐填充的应用

参考资料

  1. LMAX Disruptor User Guide
  2. 高性能队列——Disruptor