并发包中的AQS
# 并发包中的AQS
AQS(AbstractQueuedSynchronizer)是Java并发编程的核心基础框架,它提供了一套通用的同步器实现模板,简化了锁和其他同步工具的开发。理解AQS是掌握Java并发包(
java.util.concurrent
,简称JUC)的关键。理解AQS的设计(状态、队列、模板方法)是掌握Java高并发编程的核心,几乎所有JUC工具都基于它构建,开发者也可通过AQS快速实现自定义锁。
# AQS是什么?
定义:AQS是一个抽象类,通过维护一个共享资源状态(volatile int state
)和一个FIFO双向队列(CLH队列),实现了线程的排队、阻塞、唤醒机制。开发者只需通过重写AQS的模板方法(如tryAcquire
/tryRelease
),即可快速构建自定义锁或同步工具。
核心思想:
“资源状态 + 队列管理”。如果线程请求资源失败(如锁已被占用),AQS会自动将该线程封装为队列节点(Node
),加入等待队列,并通过自旋或阻塞(LockSupport.park()
)等待唤醒。
# AQS为并发编程解决了什么问题?
- 统一管理线程排队与唤醒: 开发者无需手动处理复杂的线程阻塞、唤醒和队列调度逻辑,只需关注资源状态的获取与释放规则。
- 支持多种同步模式: AQS提供独占模式(如ReentrantLock)**和**共享模式(如Semaphore),并可混合使用(如ReadWriteLock)。
- 高性能与可扩展性:
通过CAS操作(Compare-And-Swap)实现无锁化状态更新,减少线程上下文切换开销。例如,
state
的修改通过Unsafe
类直接操作内存,保证原子性。
它为什么能做到这些呢?一定是它具有某些独特的结构,接下来一起看看,它的内部实现(待补充)
# AQS的核心机制
(1) 资源状态(state
)
- 作用:表示同步资源是否可用(如锁是否被持有、信号量的剩余许可数)。
- 操作:通过
getState()
、setState()
、compareAndSetState()
方法修改状态,底层依赖CAS保证线程安全。
(2) CLH队列(等待队列)
- 结构:双向链表,每个节点(
Node
)封装一个等待线程。 - 行为:
- 线程获取资源失败时,被包装为节点加入队列尾部。
- 资源释放时,AQS按FIFO规则唤醒队列中的下一个线程(公平锁)或直接竞争(非公平锁)。
(3) 模板方法模式
开发者需重写以下方法定义资源获取/释放的规则:
- 独占模式:
tryAcquire(int arg)
、tryRelease(int arg)
- 共享模式:
tryAcquireShared(int arg)
、tryReleaseShared(int arg)
# 基于AQS的工具们
几乎所有JUC中的锁和同步工具都直接或间接依赖AQS:
(1) ReentrantLock(独占锁)
- 实现:基于AQS的独占模式,
state
表示锁的重入次数(0=未锁定,≥1=被持有)。 - 公平性: 通过构造函数选择公平锁(按队列顺序获取)或非公平锁(允许插队)。
(2) Semaphore(信号量)
- 实现:
基于AQS的共享模式,
state
表示可用许可证数量。acquire()
/release()
对应许可证的获取和释放。
(3) CountDownLatch(倒计时闩)
- 实现:
state
初始化后表示需要等待的计数。调用countDown()
减少计数,await()
阻塞直到state=0
。
(4) CyclicBarrier(循环栅栏)
- 间接依赖:
通过
ReentrantLock
和Condition
(AQS的条件变量ConditionObject
)实现线程的等待与唤醒。
(5) ReentrantReadWriteLock(读写锁)
- 实现:将AQS的
state
拆分为高16位(读锁计数)和低16位(写锁计数),通过两个内部类(读锁共享、写锁独占)实现读写分离。
(6) FutureTask(异步任务)
- 实现:通过AQS的
state
表示任务状态(未启动、已完成、已取消),控制get()
方法的阻塞行为。
# AQS的优势
代码复用:
JUC中的同步工具只需实现资源状态逻辑,无需重复编写队列管理代码。
复杂性封装:
AQS处理了线程阻塞、唤醒、超时、中断等底层细节,开发者聚焦业务逻辑。
高性能:
通过CLH队列和CAS减少竞争,避免不必要的线程切换。
灵活性:
支持公平/非公平、独占/共享、可重入等特性,覆盖绝大多数并发场景。
# 一个例子
- ReentrantLock如何基于AQS工作
// ReentrantLock的Sync内部类继承AQS
static final class NonfairSync extends Sync {
// 尝试获取锁(非公平)
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) { // CAS抢锁
setExclusiveOwnerThread(current); // 设置持有线程
return true;
}
} else if (current == getExclusiveOwnerThread()) { // 重入
setState(c + acquires);
return true;
}
return false; // 获取失败,进入队列等待
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18