abstractclassBaseMpscLinkedArrayQueue<E> extendsBaseMpscLinkedArrayQueueColdProducerFields<E> implementsMessagePassingQueue<E>, QueueProgressIndicators { @Override publicbooleanoffer(final E e) { if (null == e) { thrownewNullPointerException(); }
long mask; E[] buffer; // 生产者指向的数组 long pIndex; // 生产索引
while (true) { // 获取生产者索引最大限制 longproducerLimit= lvProducerLimit(); // 获取生产者索引 pIndex = lvProducerIndex(); // 生产索引以 2 为步长递增, // 第 0 位标识为 resize ,所以非扩容场景,不会是奇数, // 扩容的时候,会在 offerSlowPath() 中扩容线程会将其设为奇数 // lower bit is indicative of resize, if we see it we spin until it's cleared if ((pIndex & 1) == 1) { // 奇数代表正在扩容,自旋,等待扩容完成 continue; } // pIndex is even (lower bit is 0) -> actual index is (pIndex >> 1) // pIndex 是偶数, 实际的索引值需要除以 2
// mask/buffer may get changed by resizing -> only use for array access after successful CAS. mask = this.producerMask; buffer = this.producerBuffer; // a successful CAS ties the ordering, lv(pIndex) - [mask/buffer] -> cas(pIndex)
// assumption behind this optimization is that queue is almost always empty or near empty // 阈值 producerLimit 小于等于生产者指针位置 pIndex, 需要扩容 if (producerLimit <= pIndex) { // 通过 offerSlowPath 返回状态值,来查看怎么来处理这个待添加的元素 intresult= offerSlowPath(mask, pIndex, producerLimit); switch (result) { // producerLimit 虽然达到了 limit, // 但是当前数组已经被消费了部分数据,暂时不会扩容,会使用已被消费的槽位 case CONTINUE_TO_P_INDEX_CAS: break; // 可能由于并发原因导致 CAS 失败,那么则再次重新尝试添加元素 case RETRY: continue; // 队列已满,直接返回false操作 case QUEUE_FULL: returnfalse; // 队列需要扩容操作 case QUEUE_RESIZE: // 对队列进行直接扩容操作 resize(mask, buffer, pIndex, e, null); returntrue; } }
// 有序写入数组元素 // INDEX visible before ELEMENT finallongoffset= modifiedCalcCircularRefElementOffset(pIndex, mask); // 将 buffer 数组的指定位置替换为 e, // 不是根据下标来设置的,是根据槽位的地址偏移量 offset, UNSAFE 操作 soRefElement(buffer, offset, e); // release element e returntrue; } }
abstractclassBaseMpscLinkedArrayQueue<E> extendsBaseMpscLinkedArrayQueueColdProducerFields<E> implementsMessagePassingQueue<E>, QueueProgressIndicators { /** * We do not inline resize into this method because we do not resize on fill. */ privateintofferSlowPath(long mask, long pIndex, long producerLimit) { // 消费者索引 finallongcIndex= lvConsumerIndex(); // 数组缓冲的容量,(长度-1) * 2 longbufferCapacity= getCurrentBufferCapacity(mask);
// 消费索引 + 当前数组的容量 > 生产索引,代表当前数组已有部分元素被消费了, // 所以不会扩容,会使用已被消费的槽位。 if (cIndex + bufferCapacity > pIndex) { if (!casProducerLimit(producerLimit, cIndex + bufferCapacity)) { // CAS失败,自旋重试 // retry from top return RETRY; } else { // 重试用 CAS 方式修改生产索引 // continue to pIndex CAS return CONTINUE_TO_P_INDEX_CAS; } } // 根据生产者和消费者索引判断 Queue 是否已满,无界队列永不会满 // full and cannot grow elseif (availableInQueue(pIndex, cIndex) <= 0) { // offer should return false; return QUEUE_FULL; } // 用 CAS 的方式将 producerIndex 加 1, 奇数代表正在 resize // grab index for resize -> set lower bit elseif (casProducerIndex(pIndex, pIndex + 1)) { // trigger a resize return QUEUE_RESIZE; } else { // failed resize attempt, retry from top return RETRY; } } }
如果需要扩容,线程会 CAS 操作将 producerIndex 改为奇数,让其它线程能感知到队列正在扩容,要生产数据的线程先自旋,等待扩容完成再继续操作。
// 元素 e 放到新数组中 soRefElement(newBuffer, offsetInNew, e == null ? s.get() : e);// element in new array // 旧数组和新数组建立连接,旧数组的最后一个元素保存新数组的地址 soRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);// buffer linked
abstractclassBaseMpscLinkedArrayQueue<E> extendsBaseMpscLinkedArrayQueueColdProducerFields<E> implementsMessagePassingQueue<E>, QueueProgressIndicators { /** * {@inheritDoc} * <p> * This implementation is correct for single consumer thread use only. * poll() 没有做并发控制, MpscQueue 是多生产单消费者的 Queue ,只有一个消费者,这个也是 netty 换成 MSPCQueue 的主要原因 */ @SuppressWarnings("unchecked") @Override public E poll() { final E[] buffer = consumerBuffer; finallongcIndex= lpConsumerIndex(); finallongmask= consumerMask;
finallongoffset= modifiedCalcCircularRefElementOffset(cIndex, mask); Objecte= lvRefElement(buffer, offset); if (e == null) { longpIndex= lvProducerIndex(); // isEmpty? // 元素已经消费完 if ((cIndex - pIndex) / 2 == 0) { returnnull; } // offer() 时生产者先 CAS 改 producerIndex,再设置元素,中间会有一个时间差,此时会自旋,等待元素设置完成 // poll() == null iff queue is empty, null element is not strong enough indicator, so we must // spin until element is visible. do { e = lvRefElement(buffer, offset); } while (e == null); }
// 队列扩容了 if (e == JUMP) { // 获取下一个数组 final E[] nextBuffer = nextBuffer(buffer, mask); // 从下一个数组中消费 return newBufferPoll(nextBuffer, cIndex); }
/** * Add the given value to current counter value. Concurrent updates will * not be lost, but addAndGet or getAndAdd are not implemented because the * total counter value (i.e., {@link #get}) is not atomically updated. * Updates are striped across an array of counters to avoid cache contention * and has been tested with performance scaling linearly up to 768 CPUs. */ publicvoidadd( long x ) { add_if( x); } /** {@link #add} with -1 */ publicvoiddecrement() { add_if(-1L); } /** {@link #add} with +1 */ publicvoidincrement() { add_if( 1L); }
/** Atomically set the sum of the striped counters to specified value. * Rather more expensive than a simple store, in order to remain atomic. */ publicvoidset( long x ) { CATnewcat=newCAT(null,4,x); // Spin until CAS works while( !CAS_cat(_cat,newcat) ) {/*empty*/} }
/** * Current value of the counter. Since other threads are updating furiously * the value is only approximate, but it includes all counts made by the * current thread. Requires a pass over the internally striped counters. */ publiclongget() { return _cat.sum(); } /** Same as {@link #get}, included for completeness. */ publicintintValue() { return (int)_cat.sum(); } /** Same as {@link #get}, included for completeness. */ publiclonglongValue() { return _cat.sum(); }
/** * A cheaper {@link #get}. Updated only once/millisecond, but as fast as a * simple load instruction when not updating. */ publiclongestimate_get( ) { return _cat.estimate_sum(); }
/** * Return the counter's {@code long} value converted to a string. */ public String toString() { return _cat.toString(); }
/** * A more verbose print than {@link #toString}, showing internal structure. * Useful for debugging. */ publicvoidprint() { _cat.print(); }
/** * Return the internal counter striping factor. Useful for diagnosing * performance problems. */ publicintinternal_size() { return _cat._t.length; }
// Only add 'x' to some slot in table, hinted at by 'hash'. The sum can // overflow. Value is CAS'd so no counts are lost. The CAS is retried until // it succeeds. Returned value is the old value. privatelongadd_if( long x ) { return _cat.add_if(x,hash(),this); }
// The underlying array of concurrently updated long counters privatevolatileCAT_cat=newCAT(null,16/*Start Small, Think Big!*/,0L); privatestatic AtomicReferenceFieldUpdater<ConcurrentAutoTable,CAT> _catUpdater = AtomicReferenceFieldUpdater.newUpdater(ConcurrentAutoTable.class,CAT.class, "_cat"); privatebooleanCAS_cat( CAT oldcat, CAT newcat ) { return _catUpdater.compareAndSet(this,oldcat,newcat); }
// Hash spreader privatestaticinthash() { //int h = (int)Thread.currentThread().getId(); inth= System.identityHashCode(Thread.currentThread()); return h<<3; // Pad out cache lines. The goal is to avoid cache-line contention } }