JUC 包提供了一系列的原子性操作类,这些类都是使用非阻塞算法CAS 实现的,相比使用锁实现原子性操作这在性能上有很大提高。由于原子性操作类的原理都大致相同,所以本文只讲解最简单的AtomicLong
类及JDK 8 中新增的LongAdder
的实现原理。
AtomicLong
JUC 并发包中包含有Atomiclnteger
、AtomicLong
和AtomicBoolean
等原子性操作类,它们的原理类似。AtomicLong 是原子性递增或者递减类,其内部使用Unsafe
来实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class AtomicLong extends Number implements java.io.Serializable { private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset;
static { try { valueOffset = unsafe.objectFieldOffset (AtomicLong.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile long value;
public final long getAndIncrement() { return unsafe.getAndAddLong(this, valueOffset, 1L); }
}
|
getAndIncrement()
、getAndDecrement()
等主要数都是通过调用Unsafe
的getAndAddLong
方法来实现操作,其在JDK8中的实现为:
1 2 3 4 5 6 7 8 9 10 11 12 13
| public final long getAndAddLong(Object var1, long var2, long var4) { long var6; do { var6 = this.getLongVolatile(var1, var2); } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));
return var6; }
|
LongAdder
AtomicLong
通过CAS 提供了非阻塞的原子性操作,相比使用阻塞算法的同步器来说它的性能己经很好了,但是使用AtomicLong
时,在高并发下大量线程会同时去竞争更新同一个原子变量,但是由于同时只有一个线程的CAS 操作会成功,这就造成了大量线程竞争失败后,会通过无限循环不断进行自旋尝试CAS 的操作, 而这会白白浪费CPU 资源。JDK 8 新增了一个原子性递增或者递减类LongAdder
用来克服在高并发下使用AtomicLong
的缺点。
既然AtomicLong
的性能瓶颈是由于过多线程同时去竞争一个变量的更新而产生的,那么可以把一个变量分解为多个变量,让同样多的线程去竞争多个资源,从而解决性能问题,这就是LongAdder
设计的思路。

LongAdder
在内部维护多个Cell
变量,Cell
类型是AtomicLong
的一个改进。这样,在同等并发量的情况下,争夺单个变量更新操作的线程量会减少,这变相地减少了争夺共享资源的并发量。另外,多个线程在争夺同一个Cell
原子变量时如果失败了, 它并不是在当前Cell
变量上一直自旋CAS 重试,而是尝试在其他Cell
的变量上进行CAS 尝试,这个改变增加了当前线程重试CAS 成功的可能性。
最后,在获取LongAdder
当前值时, 是把所有Cell
变量的value值累加后再加上base
返回的。
由于Cells
占用的内存是相对比较大的,所以一开始并不创建它,而是在需要时创建,也就是惰性加载。LongAdder
维护了一个延迟初始化的原子性更新数组(默认情况下Cell
数组是null
)和一个基值变量base
。

LongAdder
类内部维护着三个变量。获取LongAdder
的真实值时,得到的其实是base
的值与cells
数组里面所有Cell
元素中的value
值的累加,base
是个基础值,默认为0 。cellsBusy
用来实现自旋锁,状态值只有0 和1,使用CAS操作cellsBusy
来保证同时只有一个线程可以进行创建Cell
元素、扩容cells
数组、初始化cells
数组这些操作。
LongAdder
类的主要函数有:
1 2 3 4 5 6 7 8 9 10 11 12
| public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
|
要注意的是,由于计算总和时没有对cells
数组进行加锁,所以在累加过程中可能有其他线程对Cell
中的值进行了修改, 也有可能对数组进行了扩容,所以sum
返回的值并不是非常精确的, 其返回值并不是一个调用sum
方法时的原子快照值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } }
|
longAccumulate
源码解析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
| final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h;
if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); h = getProbe(); wasUncontended = true; } boolean collide = false; for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) {
Cell r = new Cell(x); if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; } } collide = false; } else if (!wasUncontended) wasUncontended = true; else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; else if (n >= NCPU || cells != as) collide = false; else if (!collide) collide = true;
else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; }
h = advanceProbe(h); }
else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally {
cellsBusy = 0; } if (init) break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; } }
|
阅读上面的源码后,我们基本可以回答以下几个重要问题:
LongAdder 的结构是怎样的
base
基值变量,cells
数组,cellsBusy
标识
当前线程应该访问cells
数组里面的哪一个Cell
元素
元素下标通过cells
数组长度对threadLocalRandomProbe
取模计算得到,threadLocalRandomProbe
可以看成是ThreadLocal
类型的变量,只与当前线程相关。
如何初始化cells
数组? 略
cells
数组如何扩容?略
线程访问分配的Cell 元素有冲突后如何处理:扩容(需要满足cells
数组元素个数小于CPU核心数),并重新计算当前线程的threadLocalRandomProbe
,减小冲突概率
如何保证线程操作被分配的Cell
元素的原子性:CAS操作