文章目录
- 高并发场景下CAS效率的优化
- 1.空间换时间(LongAdder)
- 2.对比LongAdder和AtomicLong执行效率
- 2.1.AtmoictLong
- 2.2.LongAdder
- 2.3.比对
- 3.LongAdder原理
- 3.1.基类Striped64内部的三个重要成员
- 3.2.LongAdder.add()方法
- 3.3.LongAdder中longAccumulate()方法
- 3.4.LongAdder.casCellsBusy()方法
高并发场景下CAS效率的优化
在高并发情况下,CAS操作的自旋重试会导致系统开销的增加,甚至有些线程可能进入一个无线重复的循环中。
除了存在CAS空自旋外,在SMP机构的CPU平台上,大量的CAS操作还可能导致
"总线风暴"
总线风暴是指在计算机系统中,由于大量的处理器或设备同时请求总线而导致的总线利用率骤增的现象。在多处理器系统或者高性能计算机中,当大量的处理器或者设备同时尝试访问系统总线时,可能会出现总线的瓶颈,导致系统性能下降或者崩溃。
总线风暴通常发生在以下情况下:
- 大量处理器竞争总线资源: 在多处理器系统中,如果大量的处理器同时竞争总线资源,比如同时发送读写请求或者数据传输请求,就会导致总线的繁忙,进而引发总线风暴。
- 外设访问频繁: 除了处理器外,系统中的其他外设,如存储设备、网络接口等,如果频繁地访问总线,也可能造成总线风暴。
总线风暴会带来严重的系统性能问题,包括但不限于:
- 性能下降: 总线风暴会导致总线资源的过度利用,从而降低了其他设备和处理器对总线的访问效率,系统的整体性能会因此下降。
- 数据丢失和冲突: 当多个设备同时请求总线时,可能会导致数据的丢失和冲突,影响数据的正确传输和处理。
- 系统稳定性问题: 如果总线风暴持续时间较长或者严重程度较高,可能会导致系统崩溃或者死锁等严重的稳定性问题。
主要影响的有一下几方面:
-
竞争激烈导致自旋时间延长: 当有多个线程同时竞争同一份资源时,只有一个线程能够成功执行CAS操作,其他线程将会不断进行自旋重试。如果竞争激烈,那么自旋时间会变得非常长,因为每个线程都需要等待前一个线程释放资源,才能尝试进行CAS操作。这样一来,自旋的时间延长会增加系统的开销,降低系统的性能。
-
上下文切换开销增加: 自旋重试过程中,如果一个线程在自旋过程中被抢占,需要进行上下文切换到其他线程,再切回来时会带来额外的开销。在高并发情况下,频繁的上下文切换会导致系统资源的浪费,降低系统的整体性能。
-
缓存竞争增加: 当多个线程同时竞争同一份资源时,由于缓存一致性协议的存在,会导致缓存行的竞争和缓存行的无效化。这会增加缓存访问的延迟和内存总线的压力,进而影响系统的性能。
-
对系统负载的影响: 随着自旋重试次数的增加,系统的负载也会增加。这会影响系统的响应速度和吞吐量,导致系统的整体性能下降。
1.空间换时间(LongAdder)
JDK8,提供了一个新的类,LongAdder,以空间换时间的方式提升高并发场景下CAS的操作性能。
LongAdder是Java并发包(java.util.concurrent)中提供的一种用于高并发环境下的原子累加器类型。它主要解决了在高并发情况下使用AtomicLong时可能出现的性能瓶颈问题。核心思想就是:热点分离
LongAdder的主要特点和优势包括:
-
分段累加: LongAdder内部采用了分段的方式来进行累加操作。它将累加器的值分成多个段,每个段有一个独立的累加器变量,称为"cell"。这样,在高并发情况下,不同线程可以同时对不同段的累加器进行操作,减少了线程竞争,提高了并发性能。(将value值分割成一个数组,当多线程访问时,通过Hash算法将线程映射到一个元素进行操作,从而获取value的结果,最终将数组的元素求和)
-
无锁操作: LongAdder的每个段都是独立的,因此在对各个段进行累加操作时,并不需要加锁。它使用了一种类似于CAS(Compare And Swap)的乐观锁机制,避免了使用全局锁带来的性能损耗。
-
高并发性能: 在高并发环境下,LongAdder的性能明显优于AtomicLong。由于采用了分段累加和无锁操作,LongAdder能够更好地应对大量线程同时对累加器进行操作的情况,从而提高了系统的并发性能。
-
自动扩容: 如果当前的线程数量超过了初始分段的数量,LongAdder会自动扩容,增加更多的段来适应更大的并发量,从而保持较高的性能。
-
统计汇总: LongAdder提供了
sum()
方法来获取所有段的累加器值的总和,方便对累加结果进行统计和汇总。
2.对比LongAdder和AtomicLong执行效率
下面我们做个小实验,通过代码来观察一下 LongAdder和AtomicLong执行的一个效率,我们使用10个线程,每个线程累计累加1000次管家一下执行时间
2.1.AtmoictLong
private static final Logger log = LoggerFactory.getLogger(LongAdderTest.class);
@Test
@DisplayName("测试AtmoictLong的执行效率")
public void testLongAdder() {
long start = System.currentTimeMillis();
CountDownLatch latch = new CountDownLatch(10);
AtomicLong longAdder = new AtomicLong();
ArrayList<Thread> threads = new ArrayList<>();
for (int j = 0; j <10; j++) {
threads.add(new Thread(()->{
for (int i = 0; i < 1000000000; i++) {
longAdder.incrementAndGet();
}
latch.countDown();
}));
}
// 启动全部线程
threads.forEach(Thread::start);
// 等待全部线程执行完毕
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
long end = System.currentTimeMillis();
log.error("执行结果:{}",longAdder.longValue());
log.error("执行耗时:{}ms",end-start);
}
2.2.LongAdder
@Test
@DisplayName("测试LongAdder的执行效率")
public void testLongAdder() {
long start = System.currentTimeMillis();
CountDownLatch latch = new CountDownLatch(10);
LongAdder longAdder = new LongAdder();
ArrayList<Thread> threads = new ArrayList<>();
for (int j = 0; j <10; j++) {
threads.add(new Thread(()->{
for (int i = 0; i < 1000000000; i++) {
longAdder.add(1);
}
latch.countDown();
}));
}
// 启动全部线程
threads.forEach(Thread::start);
// 等待全部线程执行完毕
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
long end = System.currentTimeMillis();
log.error("执行结果:{}",longAdder.longValue());
log.error("执行耗时:{}ms",end-start);
}
2.3.比对
但是LongAdder 也不是任意场景下都比 Atomic块,如果数量量小的情况下,AtomicLong的速度要更快一些的
AtomicLong 是一种基于 CAS(Compare And Swap)的方式实现的原子操作类,适用于并发量比较小的场景。它在单个变量上执行原子操作,适用于高并发但并发量不是特别大的情况。因为它的实现比较轻量级,适合于竞争激烈但线程数量不是特别多的情况。
LongAdder 则是针对高并发场景做了优化的一种方式。它将变量分散到多个单元中,并行地执行更新操作,然后在需要获取当前值的时候将这些单元的值合并起来。这种方式在高并发的情况下能够减小竞争,从而提高性能。但是在并发量比较小的情况下,这种分散和合并的操作会带来额外的开销,使得 LongAdder 的性能略逊于 AtomicLong。
选择使用 LongAdder 还是 AtomicLong 取决于具体场景和需求。
3.LongAdder原理
AtomicLong使用内部变量value保存的实际的long值,所有的操作都是针对该value变量进行,在高并发的环境下面,value变量其实就是一个热点,也就是N个线程竞争一个热点。重新线程越多,也就意味着CAS失败的概率越高,从而进入恶性的CAS自旋状态
LongAdder的基本思路就是分散热点,将value值分散到一个数组中,不同线程会命中不同槽中的元素,每个线程只能对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多了
LongAdder的实现思路其实和CurrentHashMap中分段使用的原理非常相似,本质上都是不同的线程,在不同的单元上进行操作,减少了线程的竞争,提高了并发的效率。
LongAdder的内部成员包含一个base值 和 一个cells数组,在最初无竞争的时候,只能操作base的值。只有当线程执行CAS失败后,才会初始cells数组,并为线程分配所对应的元素。
LongAdder的设计目的是为了在高并发情况下提供更好的性能,相比于传统的原子操作类(比如 AtomicLong),在高并发下可以减少竞争。
- LongAdder 的核心思想是将一个 long 类型的值分解成多个单元(cell),每个单元都是独立的,每个单元保存一部分的累加值。
- 当多个线程同时对 LongAdder 进行累加操作时,LongAdder 会根据线程的哈希值选择不同的单元进行累加操作,这样可以减小竞争。如果发生了哈希冲突,会采用 CAS 操作尝试更新单元的值。
- 当需要获取当前值时,LongAdder 会将所有单元的值累加起来,得到最终的结果。由于每个单元都是独立的,所以在获取值的时候不需要加锁,可以并行地进行。这样就减小了获取值时的开销,提高了性能。
核心优势
- 减小竞争:通过分段累加和并行更新,减小了多线程下的竞争,提高了性能。
- 并行计算:在获取值时,可以并行地将所有单元的值累加起来,减小了获取值时的开销。
总的来说,LongAdder 的设计思路清晰,将累加操作分解成多个独立的单元,通过分段累加和并行计算提高了在高并发情况下的性能表现。
3.1.基类Striped64内部的三个重要成员
LongAdder继承 Striped64类,base值和cells数组都在Striped64类中定义,其中比较重要的几个成员有
/** * Table of cells. When non-null, size is a power of 2. */ transient volatile Cell[] cells; /** * Base value, used mainly when there is no contention, but also as * a fallback during table initialization races. Updated via CAS. */ transient volatile long base; /** * Spinlock (locked via CAS) used when resizing and/or creating Cells. */ transient volatile int cellsBusy;
1. cells
- 描述:
cells
是一个volatile Cell[]
类型的数组,用于存储多个单元(Cell
)。存放cell的哈希表,大小为2的幂 - 作用:每个单元(
Cell
)都包含一个volatile long
类型的变量,用于存储一部分累加值。cells
数组的每个元素都可以被多个线程同时访问和修改,用于存储累加值的其他部分。
2. base
- 描述:
base
是一个volatile long
类型的变量,用于存储一部分累加值。 - 作用:
base
可以被多个线程同时访问和修改,用于存储累加值的一部分。
3. cellsBusy
- 描述:
cellsBusy
是一个volatile int
类型的变量,用作自旋锁。 - 作用:在进行数组扩容或者创建新的
Cell
单元时,通过 CAS 操作来获取锁,以保证只有一个线程进行扩容或者创建操作。
Striped64的整体值value获取如下
// LongAdder中
/**
* 获取当前累加器的值(long类型)。
* @return 当前累加器的值
*/
public long longValue() {
return sum();
}
/**
* 计算累加器中所有单元值的总和。
*
* @return 累加器中所有单元值的总和
*/
public long sum() {
// 获取单元数组的引用
Cell[] cs = cells;
// 初始化总和为基本值
long sum = base;
// 如果单元数组不为null,则遍历每个单元并累加值到总和
if (cs != null) {
for (Cell c : cs)
if (c != null)
sum += c.value;
}
return sum; // 返回总和
}
sum()
方法首先获取了当前累加器中的单元数组cells
的引用,并将基本值base
赋值给sum
变量,作为初始总和。- 然后,如果单元数组
cells
不为 null,就遍历每个单元,将单元中的值累加到总和sum
中。 - 最后返回总和
sum
,即为累加器中所有单元值的总和。
3.2.LongAdder.add()方法
作为示例这里分析一下LongAdder的add方法
/**
* 将给定的值添加到累加器中。
*
* @param x 要添加的值
*/
public void add(long x) {
Cell[] cs; // 单元数组
long b, v; // 基本值和单元值
int m; // 单元数组长度减一
Cell c; // 单元对象
// 如果单元数组不为null,或者通过CAS操作将基本值更新为原基本值加上x成功
if ((cs = cells) != null || !casBase(b = base, b + x)) {
// 获取当前线程的哈希值
int index = getProbe();
// 默认情况下,当前线程没有争用(即不与其他线程竞争)
boolean uncontended = true;
// 如果单元数组为空,或者单元数组长度减一小于0,或者单元数组中的单元为空,或者单元中的值通过CAS操作更新失败
if (cs == null || (m = cs.length - 1) < 0 ||
(c = cs[index & m]) == null ||
!(uncontended = c.cas(v = c.value, v + x)))
// 调用longAccumulate方法进行累加操作
longAccumulate(x, null, uncontended, index);
}
}
/**
* 通过CAS操作来更新基本值。
*
* @param cmp 期望值
* @param val 新值
* @return 如果更新成功返回true,否则返回false
*/
final boolean casBase(long cmp, long val) {
// 调用 BASE 中的 weakCompareAndSetRelease 方法,尝试使用 CAS 操作更新基本值
return BASE.weakCompareAndSetRelease(this, cmp, val);
}
add
方法首先尝试通过 CAS 操作将给定的值累加到基本值base
上。如果成功,方法结束。- 如果 CAS 操作失败,说明存在竞争或者
cells
数组不为 null,则获取当前线程的哈希值,并尝试更新对应的单元。 - 如果更新单元的过程中发生竞争或者单元数组为空,则调用
longAccumulate
方法进行累加操作。 - 这种设计能够有效地减少竞争,提高并发性能,尤其在高并发情况下,避免了多个线程同时更新同一个单元造成的性能下降。
3.3.LongAdder中longAccumulate()方法
longAccumulate()方法是Striped64中重要的方法,主要是实现不同线程更新各自Cell中的值,其逻辑类似于分段式锁。
/**
* 通过 CAS 操作来累加给定的值。
*
* @param x 要累加的值
* @param fn 用于累加的操作函数
* @param wasUncontended 标志位,表示当前线程是否为无竞争状态
* @param index 当前线程的哈希码
*/
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended, int index) {
// 如果哈希码为0,则重新计算哈希码并标记为无竞争状态
if (index == 0) {
ThreadLocalRandom.current(); // 强制初始化随机数生成器
index = getProbe(); // 获取当前线程的哈希码
wasUncontended = true; // 标记为无竞争状态
}
// 循环尝试进行累加操作
for (boolean collide = false;;) { // 标志位,表示上一次操作是否发生哈希冲突
Cell[] cs; // 单元数组
Cell c; // 单元对象
int n; // 单元数组长度
long v; // 单元值
// 如果单元数组不为null且长度大于0
if ((cs = cells) != null && (n = cs.length) > 0) {
// 计算单元数组中的索引
if ((c = cs[(n - 1) & index]) == null) {
// 如果对应的单元为空
if (cellsBusy == 0) { // 尝试创建新的单元
Cell r = new Cell(x); // 乐观地创建单元
if (cellsBusy == 0 && casCellsBusy()) { // 尝试获取单元数组更新锁
try { // 在锁定状态下重新检查
Cell[] rs; // 重新检查后的单元数组
int m, j; // 单元数组长度和索引
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & index] == null) {
rs[j] = r; // 更新单元数组
break; // 跳出循环
}
} finally {
cellsBusy = 0; // 释放单元数组更新锁
}
continue; // 重新尝试操作
}
}
collide = false; // 未发生哈希冲突
}
else if (!wasUncontended) // 如果已知CAS操作失败
wasUncontended = true; // 继续哈希冲突后的操作
else if (c.cas(v = c.value,
(fn == null) ? v + x : fn.applyAsLong(v, x)))
break; // 累加操作成功,跳出循环
else if (n >= NCPU || cells != cs)
collide = false; // 达到最大大小或者单元数组失效
else if (!collide)
collide = true; // 发生哈希冲突
else if (cellsBusy == 0 && casCellsBusy()) { // 尝试获取单元数组更新锁
try { // 在锁定状态下重新检查
if (cells == cs) // 如果单元数组未被其他线程修改
cells = Arrays.copyOf(cs, n << 1); // 扩容单元数组
} finally {
cellsBusy = 0; // 释放单元数组更新锁
}
collide = false; // 未发生哈希冲突
continue; // 重新尝试操作
}
index = advanceProbe(index); // 更新哈希码
}
else if (cellsBusy == 0 && cells == cs && casCellsBusy()) { // 尝试获取单元数组更新锁
try { // 在锁定状态下初始化单元数组
if (cells == cs) { // 双重检查
Cell[] rs = new Cell[2]; // 创建新的单元数组
rs[index & 1] = new Cell(x); // 初始化单元
cells = rs; // 更新单元数组
break; // 跳出循环
}
} finally {
cellsBusy = 0; // 释放单元数组更新锁
}
}
else if (casBase(v = base,
(fn == null) ? v + x : fn.applyAsLong(v, x))) // 使用基本值进行累加操作
break; // 累加操作成功,跳出循环
}
}
这里主要分为 五个个部分,四个判断(其实主要就是后三个判断),一个自旋更新
- 第一个 if 分支:
- 当前线程的哈希码为0时,重新计算哈希码并标记为无竞争状态。
- 通过
getProbe()
方法获取当前线程的哈希码。 - 如果当前线程的哈希码为0,强制初始化随机数生成器,然后重新获取哈希码,并标记为无竞争状态。
- 循环部分:
- 这是一个无限循环,直到累加操作成功并跳出循环。
- 有一个
collide
变量用于标记上一次操作是否发生了哈希冲突。
- 第二个 if 分支:
- 如果
cells
数组不为 null 且长度大于 0。 - 获取
cells
数组中对应的单元。 - 如果单元为空,则尝试创建新的单元,如果成功则更新单元数组并跳出循环,否则继续尝试操作。
- 如果单元不为空,则尝试进行 CAS 操作更新单元的值,如果成功则累加操作完成并跳出循环,否则继续尝试操作。
- 如果单元数组达到了最大大小或者单元数组失效,则标记为未发生哈希冲突。
- 如果发生了哈希冲突且没有其他线程在扩容单元数组,则尝试扩容单元数组并继续操作。
- 如果
- 第三个 if 分支:
- 如果
cells
数组为 null 且没有其他线程在初始化单元数组,则尝试初始化单元数组。 - 如果成功初始化单元数组,则在其中创建一个新的单元并跳出循环。
- 如果
- 最后一个 else 分支 :
- 如果无法避免竞争,则使用基本值进行累加操作。
- 如果成功,则累加操作完成并跳出循环。
3.4.LongAdder.casCellsBusy()方法
casCellsBusy
方法是 LongAdder
类中的一个私有方法,用于通过 CAS(比较并交换)操作尝试获取单元数组更新锁。当 cellsBusy
的值为0时,表示锁是空闲的,此时该方法会尝试将 cellsBusy
的值从0更新为1,即获取锁。如果成功获取锁,则表示当前线程获得了单元数组的更新权限,可以进行单元数组的更新操作;如果获取锁失败,说明有其他线程已经持有了更新锁,当前线程需要等待。
初始时,LongAdder
中的 cells
数组为 null
,此时 casCellsBusy
的主要作用是在第一个线程进行累加操作时,尝试初始化单元数组并获取更新锁。如果当前没有其他线程持有更新锁,那么第一个线程通过 casCellsBusy
方法可以成功获取更新锁,并在更新锁的保护下进行单元数组的初始化。如果有其他线程在尝试初始化单元数组,那么第一个线程就需要等待,直到其他线程释放了更新锁。
/**
* 通过 CAS 操作来尝试获取单元数组更新锁。
*
* @return 如果成功获取锁则返回 true,否则返回 false
*/
final boolean casCellsBusy() {
// 使用 CELLSBUSY 的 compareAndSet 方法尝试将单元数组更新锁从0更新为1
return CELLSBUSY.compareAndSet(this, 0, 1);
}
注意
当 cellsBusy
成员值为1时,表示 cells
数组正在被某个线程执行初始化或者扩容操作。在这种情况下,其他线程不能立即进行如下操作:
- 无法更新单元数组的值:由于可能存在单元数组的结构变化(例如初始化或者扩容),其他线程不能直接更新单元数组的值。因为这样做可能会导致并发冲突或者数据丢失。
- 无法获取单元数组的更新锁:
casCellsBusy
方法会失败,因为单元数组的更新锁已经被其他线程持有。这意味着其他线程不能立即获取到单元数组的更新权限,需要等待当前的初始化或者扩容操作完成后才能尝试获取锁并执行更新操作。 - 无法创建新的单元:当某个线程尝试创建新的单元时,如果单元数组的某个位置为
null
,那么这个线程也不能立即将新的单元放置到该位置。因为该位置可能在被其他线程进行初始化或者扩容操作,直接放置新的单元可能导致数据丢失或者覆盖其他线程的操作。
因此,在 cellsBusy
成员值为1时,其他线程必须等待当前的初始化或者扩容操作完成后,才能安全地进行对 cells
数组的操作,以确保线程安全和数据一致性。