AbstractQueuedSynchronizer(AQS),是阻塞式鎖和同步器工具的框架。本文將初步介紹Java中AQS的基本原理,並基於AQS實現自定義阻塞式不可重入鎖,以此來演示AQS的使用。下期會以 ReentrantLock 為例,從源碼的層面介紹 AQS 的核心實現 acquire() 方法。
AQS 的目標
- 提供阻塞式獲取鎖 acquire() 和非阻塞式嘗試獲取鎖 tryAcquire();
- 提供獲取鎖超時機制;
- 通過打斷取消機制;
- 獨佔機制與共享機制;
- 條件不滿足時的等待機制;
AQS 的設計
-
使用 state 表示資源狀態
- 使用 volatile 配合 cas 保證其修改時的原子性;
- 使用 32bit int 來維護同步狀態。
-
使用先進先出隊列
- 借鑑 CLH 隊列:無鎖,使用自旋;快速,無阻塞;
- 頭節點 head 不存儲數據,尾節點 指向隊列最後一個等待線程。
-
使用 park & unpark 來調度線程
- 可以先 unpark() 再 park();
- unpark & park 針對線程,而不是同步器,控制粒度更為精細;
- park() 可通過 interrupt() 打斷。
-
使用條件變量實現等待、喚醒機制
- 每個條件變量維護一個條件變量隊列;
- 線程等待:先將線程從等待隊列中移除,再將線程加入條件變量隊列;
- 線程喚醒:先將線程從條件變量隊列中移除,再將線程加入等待隊列。
等待隊列和條件變量隊列的維護已由 AQS 實現,AQS 的子類只需定義如何維護 state、如何獲取和釋放鎖,主要需要實現的方法如下
- tryAcquire():嘗試獲取獨佔鎖;
- tryRelease():嘗試釋放獨佔鎖;
- tryAcquireShared():嘗試獲取共享鎖;
- tryReleaseShared():嘗試釋放共享鎖;
- isHeldExclusively():判斷當前線程是否持有鎖。
AQS 分為獨佔模式和共享模式,獨佔模式同時只允許一個線程訪問資源,共享模式同時允許多個線程訪問資源。
使用 AQS 的主要併發工具類如下圖所示,主要包括:ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore。
自定義阻塞式不可重入鎖
自定義同步器
自定義獨佔式同步器需要重寫 AQS 的如下方法:
- tryAcquire():嘗試獲取鎖,約定 state 為 0 表示未加鎖,為 1 表示已加鎖。
- tryRelease():嘗試釋放鎖
- isHeldExclusively():判斷當前線程是否持有鎖
- newCondition():創建條件變量
static class ISync extends AbstractQueuedSynchronizer {
/**
* 嘗試獲取鎖
* @param arg 忽略
*/
@Override
protected boolean tryAcquire(int arg) {
if (
// 將state的值置為1
compareAndSetState(0, 1)
) {
// 設置owner為當前線程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/**
* 嘗試釋放鎖
* @param arg 忽略
*/
@Override
protected boolean tryRelease(int arg) {
// 持有鎖的線程才可以釋放鎖
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 注意setExclusiveOwnerThread和setState的執行順序
setExclusiveOwnerThread(null);
setState(0);
return true;
}
/**
* 是否持有鎖
*/
@Override
protected boolean isHeldExclusively() {
// 判斷鎖的持有者是否為當前線程
return getExclusiveOwnerThread() == Thread.currentThread();
}
/**
* 創建條件變量
*/
public Condition newCondition() {
return new ConditionObject();
}
}
在 tryRelease() 方法中,注意 setExclusiveOwnerThread() 和 setState() 的執行順序,state 被 volatile 修飾,而 exclusiveOwnerThread 沒有,因此,後設置 state 可保證 exclusiveOwnerThread 的設置對其他線程可見。
private transient Thread exclusiveOwnerThread;
private volatile int state;
自定鎖
使用自定義同步器實現自定義阻塞式不可重入鎖。
class ILock implements Lock {
// 自定義同步器
private ISync sync = new ISync();
/**
* 加鎖
* 加鎖失敗則進入隊列等待
*/
@Override
public void lock() {
sync.acquire(1);
}
/**
* 加鎖
* 加鎖失敗則進入隊列等待,在加鎖的過程中可被打斷
*/
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
/**
* 嘗試加鎖
* 加鎖失敗則返回false
*/
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
/**
* 嘗試加鎖
* 指定時間內加鎖失敗則返回false
*/
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
/**
* 解鎖
*/
@Override
public void unlock() {
sync.release(1);
}
/**
* 創建條件變量
*/
@Override
@NonNull
public Condition newCondition() {
return sync.newCondition();
}
}
自定義鎖中使用到的 AQS 中的方法
public abstract class AbstractQueuedSynchronizer {
// 嘗試獲取鎖
public final void acquire(int arg) {
// 獲取鎖失敗
if (!tryAcquire(arg))
// 加入阻塞隊列
acquire(null, arg, false, false, false, 0L);
}
// 嘗試獲取鎖(可打斷)
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (
// 線程被打斷
Thread.interrupted() ||
(
// 獲取鎖失敗
!tryAcquire(arg) &&
// 加入阻塞隊列的過程中出現問題併成功取消獲取鎖
acquire(null, arg, false, true, false, 0L) < 0)
)
throw new InterruptedException();
}
// 釋放鎖
public final boolean release(int arg) {
// 釋放鎖成功
if (tryRelease(arg)) {
// 喚醒阻塞的線程
signalNext(head);
return true;
}
return false;
}
}
END
文章文檔:公眾號 字節幺零二四 回覆關鍵字可獲取本文文檔。
如果覺得本文對您有一點點幫助,歡迎點贊、轉發加關注,這會對我有非常大的幫助,咱們下期見!