0%

原子性操作类--高并发下的CAS解决方案

JUC 包提供了一系列的原子性操作类,这些类都是使用非阻塞算法CAS 实现的,相比使用锁实现原子性操作这在性能上有很大提高。由于原子性操作类的原理都大致相同,所以本文只讲解最简单的AtomicLong 类及JDK 8 中新增的LongAdder 的实现原理。

AtomicLong

JUC 并发包中包含有AtomiclntegerAtomicLongAtomicBoolean 等原子性操作类,它们的原理类似。AtomicLong 是原子性递增或者递减类,其内部使用Unsafe 来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class AtomicLong extends Number implements java.io.Serializable {
//获取Unsafe实例
private static final Unsafe unsafe = Unsafe.getUnsafe();
//存放value的偏移量
private static final long valueOffset;




static {
try {
//获取value在AtomicLong的偏移量
valueOffset = unsafe.objectFieldOffset
(AtomicLong.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
//实际变量值,使用volatile修饰
private volatile long value;




//调用unsafe方法,原子性设置value值为原始值+1, 返回值为原始值
public final long getAndIncrement() {
return unsafe.getAndAddLong(this, valueOffset, 1L);
}

}

getAndIncrement()getAndDecrement()等主要数都是通过调用UnsafegetAndAddLong 方法来实现操作,其在JDK8中的实现为:

1
2
3
4
5
6
7
8
9
10
11
12
13
public final long getAndAddLong(Object var1, long var2, long var4) {
long var6;
do {
//传入对象地址和实际值偏移量,获取实际值
var6 = this.getLongVolatile(var1, var2);
}

//尝试CAS更新实际值,失败则重试
while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));


return var6;
}

LongAdder

AtomicLong 通过CAS 提供了非阻塞的原子性操作,相比使用阻塞算法的同步器来说它的性能己经很好了,但是使用AtomicLong 时,在高并发下大量线程会同时去竞争更新同一个原子变量,但是由于同时只有一个线程的CAS 操作会成功,这就造成了大量线程竞争失败后,会通过无限循环不断进行自旋尝试CAS 的操作, 而这会白白浪费CPU 资源。JDK 8 新增了一个原子性递增或者递减类LongAdder 用来克服在高并发下使用AtomicLong 的缺点。

既然AtomicLong 的性能瓶颈是由于过多线程同时去竞争一个变量的更新而产生的,那么可以把一个变量分解为多个变量,让同样多的线程去竞争多个资源,从而解决性能问题,这就是LongAdder设计的思路。

Java AtomicLong 和LongAdder | A Big Boy Blog - Tech Articls & Notes

LongAdder 在内部维护多个Cell 变量,Cell 类型是AtomicLong的一个改进。这样,在同等并发量的情况下,争夺单个变量更新操作的线程量会减少,这变相地减少了争夺共享资源的并发量。另外,多个线程在争夺同一个Cell 原子变量时如果失败了, 它并不是在当前Cell 变量上一直自旋CAS 重试,而是尝试在其他Cell的变量上进行CAS 尝试,这个改变增加了当前线程重试CAS 成功的可能性。

最后,在获取LongAdder当前值时, 是把所有Cell 变量的value值累加后再加上base返回的。

由于Cells 占用的内存是相对比较大的,所以一开始并不创建它,而是在需要时创建,也就是惰性加载。LongAdder 维护了一个延迟初始化的原子性更新数组(默认情况下Cell 数组是null)和一个基值变量base

image-20211019171832480

LongAdder 类内部维护着三个变量。获取LongAdder 的真实值时,得到的其实是base 的值与cells 数组里面所有Cell 元素中的value 值的累加,base 是个基础值,默认为0 。cellsBusy 用来实现自旋锁,状态值只有0 和1,使用CAS操作cellsBusy 来保证同时只有一个线程可以进行创建Cell 元素、扩容cells 数组、初始化cells 数组这些操作。

LongAdder 类的主要函数有:

  • long sum():返回当前的值。
1
2
3
4
5
6
7
8
9
10
11
12
public long sum() {
Cell[] as = cells; Cell a;
//bash值加上所有Cell内部的value值后就是当前值
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

要注意的是,由于计算总和时没有对cells数组进行加锁,所以在累加过程中可能有其他线程对Cell 中的值进行了修改, 也有可能对数组进行了扩容,所以sum返回的值并不是非常精确的, 其返回值并不是一个调用sum 方法时的原子快照值。

  • add(long x):增加给定值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;

//cell是否为null,是的话就在基础变量base上累加
if ((as = cells) != null || !casBase(b = base, b + x)) {

boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||

/*
决定当前线程应该访问cells 数组里面的哪一个Cell元素
其中m 是当前cells数组元素个数-1 , getProbe()获取的是当前线程中变量threadLocalRandomProbe的值
*/
(a = as[getProbe() & m]) == null ||

//用CAS操作去更新分配到的Cell元素的value 值
!(uncontended = a.cas(v = a.value, v + x)))

//初始化或扩容cells数组
longAccumulate(x, null, uncontended);
}
}

longAccumulate源码解析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
/*
初始化当前线程的变量threadLocalRandomProbe的值,
这个变量在计算当前线程应该被分配到cells数组的哪一个Cell 元素时会用到。
*/
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}

boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {

//当前线程计算要访问的Cell元素下标,然后如果发现对应下标元素的值为null
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
/*
新增一个Cell 元素到cell 数组,
并且在将其添加到cells 数组之前要竞争设置cellsBusy 为1
*/
Cell r = new Cell(x); // Optimistically create

if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;

/*
下面数组是扩容cells数组的代码,扩容条件的前提具体就是
当前cells 的元素个数小于当前机器CPU个数
并且当前多个线程访问了cells中同一个元素从而导致冲突使其中一个线程CAS失败。
和初始化一样,需要cas设置cellsBusy值,保证cells数组的互斥访问。
*/
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
//扩容为原来的两倍
Cell[] rs = new Cell[n << 1];
//将旧的cells数组复制新的cells数组
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
/*CAS 失败的线程重新计算当前线程的随机值threadLocalRandomProbe,
以减少下次访问cells元素时的冲突机会*/
h = advanceProbe(h);
}

//下面是初始化cells数组的代码

/*
cellsBusy 是一个状态标识,
为0说明当前cells 数组没有在被初始化或者扩容,也没有在新建Cell元素,
当前线程通过casCellsBusy()设置cellsBusy 为l ,
则当前线程在初始化cells数组时,其他线程就不能进行对cells数组进行初始化。
*/
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
//初始化cells 数组元素个数为2
Cell[] rs = new Cell[2];
//使用h&1计算当前线程应该访问celll 数组的哪个位置
rs[h & 1] = new Cell(x);
cells = rs;
//标识cells 数组已经被初始化
init = true;
}
} finally {
/*重置了cellsBusy 标记。这里没有使用CAS 操作,却是线程安全的,
原因是cellsBusy 是volatile类型的,这保证了变量的内存可见性,
另外此时其他地方的代码没有机会修改cellsBusy的值*/
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}

阅读上面的源码后,我们基本可以回答以下几个重要问题:

  • LongAdder 的结构是怎样的

    base基值变量,cells数组,cellsBusy标识

  • 当前线程应该访问cells数组里面的哪一个Cell 元素

    元素下标通过cells数组长度对threadLocalRandomProbe取模计算得到,threadLocalRandomProbe可以看成是ThreadLocal类型的变量,只与当前线程相关。

  • 如何初始化cells数组? 略

  • cells数组如何扩容?略

  • 线程访问分配的Cell 元素有冲突后如何处理:扩容(需要满足cells 数组元素个数小于CPU核心数),并重新计算当前线程的threadLocalRandomProbe,减小冲突概率

  • 如何保证线程操作被分配的Cell 元素的原子性:CAS操作