disruptor笔记
Disruptor 简介
Disruptor 是一个开源的高性能队列框架,由英国外汇交易公司 LMAX 开发。研发的初衷是为了解决内存队列的延迟问题,能够在无锁的情况下实现队列的并发操作,基于 Disruptor 开发的系统单线程能支撑每秒 600 万订单。
队列的特性
先进先出(FIFO),生产者往队列里发布事件,消费者获得通知消费事件;如果队列中没有事件时,消费者堵塞,直到生产者发布了新事件。
Java 内置队列
队列 | 有界性 | 锁 | 阻塞 | 数据结构 |
---|---|---|---|---|
ArrayBlockingQueue | bounded | 加锁 | 是 | arraylist |
LinkedBlockingQueue | optionally-bounded | 加锁 | 是 | linkedlist |
ConcurrentLinkedQueue | unbounded | CAS | 否 | linkedlist |
LinkedTransferQueue | unbounded | CAS | 是 | linkedlist |
PriorityBlockingQueue | unbounded | 加锁 | 是 | heap |
DelayQueue | unbounded | 加锁 | 是 | heap |
SynchronizedQueue | synchronized | CAS | 是 | linkedlist |
队列的底层一般分成三种:数组、链表和堆。其中,堆一般情况下是为了实现带有优先级特性的队列,暂且不考虑。
我们就从数组和链表两种数据结构来看,基于数组线程安全的队列,比较典型的是 ArrayBlockingQueue,它主要通过加锁的方式来保证线程安全;基于链表的线程安全队列分成 LinkedBlockingQueue 和 ConcurrentLinkedQueue 两大类,前者也通过锁的方式来实现线程安全,而后者以及上面表格中的 LinkedTransferQueue 都是通过原子变量 compare and swap(以下简称“CAS”)这种不加锁的方式来实现的。
使用 CAS 协议实现的队列都是无界的(无法保证队列的长度在确定的范围内),理论上来说可以是无限扩展,那么如果生产者生产过快,消费者还没来得及消费,最终可能会导致内存溢出,影响系统稳定;而加锁的方式,可以实现有界队列。在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列;同时,为了减少 Java 的垃圾回收对系统性能的影响,会尽量选择 array/heap 格式的数据结构。这样筛选下来,符合条件的队列就只有 ArrayBlockingQueue。
更多 java 原生队列相关知识,参见: [java原生队列]
队列加锁性能
现实编程过程中,加锁通常会严重地影响性能。线程会因为竞争不到锁而被挂起,等锁被释放的时候,线程又会被恢复,这个过程中存在着很大的开销,并且通常会有较长时间的中断,因为当一个线程正在等待锁时,它不能做任何其他事情。如果一个线程在持有锁的情况下被延迟执行,例如发生了缺页错误、调度延迟或者其它类似情况,那么所有需要这个锁的线程都无法执行下去。如果被阻塞线程的优先级较高,而持有锁的线程优先级较低,就会发生优先级反转。
Disruptor 论文中讲述了一个实验:
- 这个测试程序调用了一个函数,该函数会对一个 64 位的计数器循环自增 5 亿次。
- 机器环境:2.4G 6 核
- 运算: 64 位的计数器累加 5 亿次
Method | Time (ms) |
---|---|
Single thread | 300 |
Single thread with CAS | 5,700 |
Single thread with lock | 10,000 |
Single thread with volatile write | 4,700 |
Two threads with CAS | 30,000 |
Two threads with lock | 224,000 |
CAS 操作比单线程无锁慢了 1 个数量级;有锁且多线程并发的情况下,速度比单线程无锁慢 3 个数量级。可见无锁速度最快。
单线程情况下,不加锁的性能 > CAS 操作的性能 > 加锁的性能。
多线程情况下,为了保证线程安全,必须使用 CAS 或锁,这种情况下,CAS 的性能超过锁的性能,前者大约是后者的 8 倍。
综上可知,加锁的性能是最差的。
Disruptor 设计方案
Disruptor 通过以下设计来解决队列速度慢的问题:
- 环形数组结构
为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。 - 元素位置定位
数组长度 2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心 index 溢出的问题。index 是 long 类型,即使 100 万 QPS 的处理速度,也需要 30 万年才能用完。 - 无锁设计
每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。 - 缓存行填充
缓存行填充解决了伪共享(False Sharing)问题,提高多线程并发访问共享变量时的性能。
缓存行填充原因及原理: 对齐填充的应用
Disruptor 主要实现类
- Disruptor:Disruptor 的入口,主要封装了环形队列 RingBuffer、消费者集合 ConsumerRepository 的引用;主要提供了获取环形队列、添加消费者、生产者向 RingBuffer 中添加事件(可以理解为生产者生产数据)的操作;
- RingBuffer:Disruptor 中队列具体的实现,底层封装了 Object[]数组;在初始化时,会使用 Event 事件对数组进行填充,填充的大小就是 bufferSize 设置的值;此外,该对象内部还维护了 Sequencer(序列生产器)具体的实现;
- Sequencer:序列生产器,分别有 MultiProducerSequencer(多生产者序列生产器) 和 SingleProducerSequencer(单生产者序列生产器)两个实现类。上面的例子中,使用的是 SingleProducerSequencer;在 Sequencer 中,维护了消费者的 Sequence(序列对象)和生产者自己的 Sequence(序列对象);以及维护了生产者与消费者序列冲突时候的等待策略 WaitStrategy;
- Sequence:序列对象,内部维护了一个 long 型的 value,这个序列指向了 RingBuffer 中 Object[] 数组具体的角标。生产者和消费者各自维护自己的 Sequence;但都是指向 RingBuffer 的 Object[] 数组;
- Wait Strategy:等待策略。当没有可消费的事件时,消费者根据特定的策略进行等待;当没有可生产的地方时,生产者根据特定的策略进行等待;
- Event:事件对象,就是我们 Ringbuffer 中存在的数据,在 Disruptor 中用 Event 来定义数据,并不存在 Event 类,它只是一个定义;
- EventProcessor:事件处理器,单独在一个线程内执行,判断消费者的序列和生产者序列关系,决定是否调用我们自定义的事件处理器,也就是是否可以进行消费;
- EventHandler:事件处理器,由用户自定义实现,也就是最终的事件消费者,需要实现 EventHandler 接口;
- Producer:事件生产者;
无锁设计实现原理
下面忽略数组的环形结构,介绍一下如何实现无锁设计。整个过程通过原子变量 CAS,保证操作的线程安全。
多生产者
多个生产者的情况下,会遇到“如何防止多个线程重复写同一个元素”的问题。Disruptor 的解决方法是,每个线程获取不同的一段数组空间进行操作。这个通过 CAS 很容易达到。只需要在分配元素的时候,通过 CAS 判断一下这段空间是否已经分配出去即可。
但是会遇到一个新问题:如何防止读取的时候,读到还未写的元素。Disruptor 在多个生产者的情况下,引入了一个与 Ring Buffer 大小相同的 buffer:available Buffer。当某个位置写入成功的时候,便把 availble Buffer 相应的位置置位,标记为写入成功。读取的时候,会遍历 available Buffer,来判断元素是否已经就绪。
下面分读数据和写数据两种情况介绍。
读数据
- 申请读取到序号 n;
- 若 writer cursor >= n,这时仍然无法确定连续可读的最大下标。从 reader cursor 开始读取 available Buffer,一直查到第一个不可用的元素,然后返回最大连续可读元素的位置;
- 消费者读取元素。
如下图所示,读线程读到下标为 2 的元素,三个线程 Writer1/Writer2/Writer3 正在向 RingBuffer 相应位置写数据,写线程被分配到的最大元素下标是 11。
读线程申请读取到下标从 3 到 11 的元素,判断 writer cursor>=11。然后开始读取 availableBuffer,从 3 开始,往后读取,发现下标为 7 的元素没有生产成功,于是 WaitFor(11)返回 6。
然后,消费者读取下标从 3 到 6 共计 4 个元素。
写数据
- 申请写入 m 个元素;
- 若是有 m 个元素可以写入,则返回最大的序列号。每个生产者会被分配一段独享的空间;
- 生产者写入元素,写入元素的同时设置 available Buffer 里面相应的位置,以标记自己哪些位置是已经写入成功的。
如下图所示,Writer1 和 Writer2 两个线程写入数组,都申请可写的数组空间。Writer1 被分配了下标 3 到下表 5 的空间,Writer2 被分配了下标 6 到下标 9 的空间。
Writer1 写入下标 3 位置的元素,同时把 available Buffer 相应位置置位,标记已经写入成功,往后移一位,开始写下标 4 位置的元素。Writer2 同样的方式。最终都写入完成。
等待策略
生产者等待策略
暂时只有休眠 1ns。
1 | LockSupport.parkNanos(1); |
消费者等待策略
名称 | 措施 | 适用场景 |
---|---|---|
BlockingWaitStrategy | 加锁 | CPU 资源紧缺,吞吐量和延迟并不重要的场景 |
BusySpinWaitStrategy | 自旋 | 通过不断重试,减少切换线程导致的系统调用,而降低延迟。推荐在线程绑定到固定的 CPU 的场景下使用 |
PhasedBackoffWaitStrategy | 自旋 + yield + 自定义策略 | CPU 资源紧缺,吞吐量和延迟并不重要的场景 |
SleepingWaitStrategy | 自旋 + yield + sleep | 性能和 CPU 资源之间有很好的折中。延迟不均匀 |
TimeoutBlockingWaitStrategy | 加锁,有超时限制 | CPU 资源紧缺,吞吐量和延迟并不重要的场景 |
YieldingWaitStrategy | 自旋 + yield + 自旋 | 性能和 CPU 资源之间有很好的折中。延迟比较均匀 |
缓存行填充
缓存行填充解决了伪共享(False Sharing)问题,提高多线程并发访问共享变量时的性能。
在现代计算机系统中,CPU 缓存以缓存行(Cache Line)为单位进行数据加载和存储。缓存行是一个固定大小的数据块,在 x86 架构中通常为 64 字节。当一个线程访问一个共享变量时,它会加载所在缓存行的数据到 CPU 缓存中。如果多个线程同时访问位于同一缓存行的不同变量,会导致伪共享问题。
伪共享问题的产生是由于多个线程的缓存行之间存在数据竞争,即它们共享同一缓存行,但实际上它们操作的变量是不同的。由于缓存一致性协议的限制,一个线程对共享变量的修改会导致整个缓存行的数据无效,从而迫使其他线程重新加载整个缓存行的数据,这会导致严重的性能下降。
缓存行填充的作用是通过在变量之间插入空白字段,使得共享变量被放置在不同的缓存行中,从而避免了多个线程之间对同一缓存行的竞争。这样,即使多个线程同时访问不同的共享变量,它们的操作不会影响彼此,也不会导致缓存行失效,从而显著提高了并发访问性能。
缓存行填充详细原因及原理: 对齐填充的应用