一、Volatile
通過前面內容我們瞭解了synchronized,雖然JVM對它做了很多優化,但是它還是一個重量級的鎖。而接下來要介紹的volatile則是輕量級的synchronized。如果一個變量使用volatile,則它比使用synchronized的成本更加低,因為它不會引起線程上下文的切換和調度。
Java語言規範對volatile的定義:Java允許線程訪問共享變量,為了確保共享變量能被準確和一致地更新,線程應該確保通過排他鎖單獨獲得這個變量。
通俗點講就是説一個變量如果用volatile修飾了,則Java可以確保所有線程看到這個變量的值是一致的,如果某個線程對volatile修飾的共享變量進行更新,那麼其他線程可以立馬看到這個更新,這就是內存可見性。
volatile雖然看起來比較簡單,使用起來無非就是在一個變量前面加上volatile即可,但是要用好並不容易。
1. 解決內存可見性問題
在可見性問題案例中進行如下修改,添加volatile關鍵詞:
private volatile boolean flag = true;
Volatile實現內存可見性的過程
線程寫Volatile變量的過程:
1)改變線程本地內存中Volatile變量副本的值;
2)將改變後的副本的值從本地內存刷新到主內存
線程讀Volatile變量的過程:
1)從主內存中讀取Volatile變量的最新值到線程的本地內存中
2)從本地內存中讀取Volatile變量的副本
Volatile實現內存可見性原理:
寫操作時,通過在寫操作指令後加入一條store屏障指令,讓本地內存中變量的值能夠刷新到主內存中
讀操作時,通過在讀操作前加入一條load屏障指令,及時讀取到變量在主內存的值
PS: 內存屏障(Memory Barrier)是一種CPU指令,用於控制特定條件下的重排序和內存可見性問題。Java編譯器也會根據內存屏障的規則禁止重排序
volatile的底層實現是通過插入內存屏障,但是對於編譯器來説,發現一個最優佈置來最小化插入內存屏障的總數幾乎是不可能的,所以,JMM採用了保守策略。如下:
- StoreStore屏障可以保證在volatile寫之前,其前面的所有普通寫操作都已經刷新到主內存中。
- StoreLoad屏障的作用是避免volatile寫與後面可能有的volatile讀/寫操作重排序。
- LoadLoad屏障用來禁止處理器把上面的volatile讀與下面的普通讀重排序。
- LoadStore屏障用來禁止處理器把上面的volatile讀與下面的普通寫重排序。
2. 原子性的問題
雖然Volatile 關鍵字可以讓變量在多個線程之間可見,但是Volatile不具備原子性。
public class Demo3Volatile {
public static void main(String[] args) throws InterruptedException {
VolatileDemo demo = new VolatileDemo();
for (int i = 0; i < 5; i++) {
Thread t = new Thread(demo);
t.start();
}
Thread.sleep(1000);
System.out.println(demo.count);
}
static class VolatileDemo implements Runnable {
public volatile int count;
//public volatile AtomicInteger count = new AtomicInteger(0);
public void run() {
addCount();
}
public void addCount() {
for (int i = 0; i < 10000; i++) {
count++;
}
}
}
}
以上出現原子性問題的原因是count++並不是原子性操作。
count = 5 開始,流程分析:
- 線程1讀取count的值為5
- 線程2讀取count的值為5
- 線程2加1操作
- 線程2最新count的值為6
- 線程2寫入值到主內存的最新值為6
這個時候,線程1的count為5,線程2的count為6
如果切換到線程1執行,那麼線程1得到的結果是6,寫入到主內存的值還是6
現在的情況是對count進行了兩次加1操作,但是主內存實際上只是加1一次
解決方案:
- 使用synchronized
- 使用ReentrantLock(可重入鎖)
- 使用AtomicInteger(原子操作)
使用synchronized
public synchronized void addCount() {
for (int i = 0; i < 10000; i++) {
count++;
}
}
使用ReentrantLock(可重入鎖)
//可重入鎖
private Lock lock = new ReentrantLock();
public void addCount() {
for (int i = 0; i < 10000; i++) {
lock.lock();
count++;
lock.unlock();
}
}
使用AtomicInteger(原子操作)
public static AtomicInteger count = new AtomicInteger(0);
public void addCount() {
for (int i = 0; i < 10000; i++) {
//count++;
count.incrementAndGet();
}
}
3. Volatile 適合使用場景
1)對變量的寫入操作不依賴其當前值
不滿足:number++、count=count*5等
滿足:boolean變量、直接賦值的變量等
2)該變量沒有包含在具有其他變量的不變式中
不滿足:不變式 low<up
總結:變量真正獨立於其他變量和自己以前的值,在單獨使用的時候,適合用volatile
4. synchronized和volatile比較
1)volatile不需要加鎖,比synchronized更輕便,不會阻塞線程
2)synchronized既能保證可見性,又能保證原子性,而volatile只能保證可見性,無法保證原子性
與鎖相比,Volatile 變量是一種非常簡單但同時又非常脆弱的同步機制,它在某些情況下將提供優於鎖的性能和伸縮性。如果嚴格遵循 volatile 的使用條件(變量真正獨立於其他變量和自己以前的值 ) 在某些情況下可以使用 volatile 代替 synchronized 來優化代碼提升效率。
二、J.U.C之CAS
J.U.C 即 java.util.concurrent,是 JSR 166 標準規範的一個實現; JSR 166 以及 J.U.C 包的作者是 Doug Lea 。
J.U.C 框架是 Java 5 中引入的,而我們最熟悉的線程池機制就在這個包,J.U.C 框架包含的內容有:
- AbstractQueuedSynchronizer(AQS框架),J.U.C 中實現鎖和同步機制的基礎;
- Locks & Condition(鎖和條件變量),比 synchronized、wait、notify 更細粒度的鎖機制;
- Executor 框架(線程池、Callable、Future),任務的執行和調度框架;
- Synchronizers(同步器),主要用於協助線程同步,有 CountDownLatch、CyclicBarrier、Semaphore、Exchanger;
- Atomic Variables(原子變量),方便程序員在多線程環境下,無鎖的進行原子操作,核心操作是 CAS 原子操作,所謂的 CAS 操作,即 compare and swap,指的是將預期值與當前變量的值比較(compare),如果相等則使用新值替換(swap)當前變量,否則不作操作;
- BlockingQueue(阻塞隊列),阻塞隊列提供了可阻塞的入隊和出對操作,如果隊列滿了,入隊操作將阻塞直到有空間可用,如果隊列空了,出隊操作將阻塞直到有元素可用;
- Concurrent Collections(併發容器),説到併發容器,不得不提同步容器。在 JDK1.5 之前,為了線程安全,我們一般都是使用同步容器,同步容器主要的缺點是:對所有容器狀態的訪問都串行化,嚴重降低了併發性;某些複合操作,仍然需要加鎖來保護;迭代期間,若其它線程併發修改該容器,會拋出 ConcurrentModificationException 異常,即快速失敗機制;
- Fork/Join 並行計算框架,這塊內容是在 JDK1.7 中引入的,可以方便利用多核平台的計算能力,簡化並行程序的編寫,開發人員僅需關注如何劃分任務和組合中間結果;
- TimeUnit 枚舉,TimeUnit 是 java.util.concurrent 包下面的一個枚舉類,TimeUnit 提供了可讀性更好的線程暫停操作,以及方便的時間單位轉換方法;
1. CAS介紹
CAS,Compare And Swap,即比較並交換。同步組件中大量使用CAS技術實現了Java多線程的併發操作。整個AQS同步組件、Atomic原子類操作等等都是以CAS實現的,甚至ConcurrentHashMap在1.8的版本中也調整為了CAS+Synchronized。可以説CAS是整個JUC的基石。
2. CAS原理剖析
再次測試之前Volatile的例子,把循環的次數調整為一億(保證在一秒之內不能遍歷完成,從而測試三種原子操作的性能),我們發現,AtomicInteger原子操作性能最高,他是用的就是CAS。
2.1 synchronized同步分析
注意,本小節是解釋synchronized性能低效的原因,只要能理解synchronized同步過程其實還需要做很多事,這些邏輯的執行都需要佔用資源,從而導致性能較低,是為了對比CAS的高效。這部分分析過於深入JMM底層原理,不適合初級甚至中級程序員學習。
我們之前講過,synchronized的同步操作主要是monitorenter和monitorexit這兩個jvm指令實現的,我們先寫一段簡單的代碼:
public class Demo2Synchronized {
public void test2() {
synchronized (this) {
}
}
}
在cmd命令行執行javac編譯和javap -c Java 字節碼的指令
javac Demo2Synchronized.java
javap -c Demo2Synchronized.class
從結果可以看出,同步代碼塊是使用monitorenter和monitorexit這兩個jvm指令實現的:
monitorenter和monitorexit這兩個jvm指令實現鎖的使用,主要是基於 Mark Word和、monitor。
Mark Word
Hotspot虛擬機的對象頭主要包括兩部分數據:Mark Word(標記字段)、Klass Pointer(類型指針)。其中Klass Point是是對象指向它的類元數據的指針,虛擬機通過這個指針來確定這個對象是哪個類的實例,Mark Word用於存儲對象自身的運行時數據,它是synchronized實現輕量級鎖和偏向鎖的關鍵。
Mark Word用於存儲對象自身的運行時數據,如哈希碼(HashCode)、GC分代年齡、鎖狀態標誌、線程持有的鎖、偏向線程 ID、偏向時間戳等等。Java對象頭一般佔有兩個機器碼(在32位虛擬機中,1個機器碼等於4字節,也就是32bit),但是如果對象是數組類型,則需要三個機器碼,因為JVM虛擬機可以通過Java對象的元數據信息確定Java對象的大小,但是無法從數組的元數據來確認數組的大小,所以用一塊來記錄數組長度。下圖是Java對象頭的存儲結構(32位虛擬機):
對象頭信息是與對象自身定義的數據無關的額外存儲成本,但是考慮到虛擬機的空間效率,Mark Word被設計成一個非固定的數據結構以便在極小的空間內存存儲儘量多的數據,它會根據對象的狀態複用自己的存儲空間,也就是説,Mark Word會隨着程序的運行發生變化,變化狀態如下(32位虛擬機):
- monitor
什麼是Monitor?我們可以把它理解為一個同步工具,也可以描述為一種同步機制,它通常被描述為一個對象。與一切皆對象一樣,所有的Java對象是天生的Monitor,每一個Java對象都有成為Monitor的潛質,因為在Java的設計中 ,每一個Java對象都帶了一把看不見的鎖,它叫做內部鎖或者Monitor鎖。
Monitor 是線程私有的數據結構,每一個線程都有一個可用monitor record列表,同時還有一個全局的可用列表。每一個被鎖住的對象都會和一個monitor關聯(對象頭的MarkWord中的LockWord指向monitor的起始地址),同時monitor中有一個Owner字段存放擁有該鎖的線程的唯一標識,表示該鎖被這個線程佔用。其結構如下:
- Owner:初始時為NULL表示當前沒有任何線程擁有該monitor record,當線程成功擁有該鎖後保存線程唯一標識,當鎖被釋放時又設置為NULL;
- EntryQ:關聯一個系統互斥鎖(semaphore),阻塞所有試圖鎖住monitor record失敗的線程。
- RcThis:表示blocked或waiting在該monitor record上的所有線程的個數。
- Nest:用來實現重入鎖的計數。
- HashCode:保存從對象頭拷貝過來的HashCode值(可能還包含GC age)。
- Candidate:用來避免不必要的阻塞或等待線程喚醒,因為每一次只有一個線程能夠成功擁有鎖,如果每次前一個釋放鎖的線程喚醒所有正在阻塞或等待的線程,會引起不必要的上下文切換(從阻塞到就緒然後因為競爭鎖失敗又被阻塞)從而導致性能嚴重下降。Candidate只有兩種可能的值0表示沒有需要喚醒的線程1表示要喚醒一個繼任線程來競爭鎖。
2.2 CAS原理
在上一部分,我們介紹了synchronized底層做了大量的工作,才實現同步,而同步保證了原子操作。但是不可避免的是性能較低。CAS是如何提高性能的呢?
CAS的思想很簡單:三個參數,一個當前內存值V、舊的預期值A、即將更新的值B,當且僅當舊的預期值A和內存值V相同時,將內存值修改為B並返回true,否則什麼都不做,並返回false。如果CAS操作失敗,通過自旋的方式等待並再次嘗試,直到成功。
CAS在 先比較後修改 這個CAS過程中,根本沒有獲取鎖,釋放鎖的操作,是硬件層面的原子操作,跟JMM內存模型沒有關係。大家可以理解為直接使用其他的語言,在JVM虛擬機之外直接操作計算機硬件,正因為如此,對比synchronized的同步,少了很多的邏輯步驟,使得性能大為提高。
JUC下的atomic類都是通過CAS來實現的,下面就是一個AtomicInteger原子操作類的例子,在其中使用了Unsafe unsafe = Unsafe.getUnsafe()。Unsafe 是CAS的核心類,它提供了硬件級別的原子操作。
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
//操作的值也進行了volatile修飾,保證內存可見性
private volatile int value;
繼續查看AtomicInteger的addAndGet()方法:
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
其內部調用unsafe的getAndAddInt方法,查看看compareAndSwapInt方法,該方法為native方法,有四個參數,分別代表:對象、對象的地址、預期值、修改值。
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
Unsafe 是一個比較危險的類,主要是用於執行低級別、不安全的方法集合。儘管這個類和所有的方法都是公開的(public),但是這個類的使用仍然受限,你無法在自己的java程序中直接使用該類,因為只有授信的代碼才能獲得該類的實例。可是為什麼Unsafe的native方法就可以保證是原子操作呢?
3. native關鍵詞
前面提到了sun.misc.Unsafe這個類,裏面的方法使用native關鍵詞聲明本地方法,為什麼要用native?
Java無法直接訪問底層操作系統,但有能力調用其他語言編寫的函數or方法,是通過JNI(Java Native Interfface)實現。使用時,通過native關鍵字告訴JVM這個方法是在外部定義的。但JVM也不知道去哪找這個原生方法,此時需要通過javah命令生成.h文件。
示例步驟(c語言為例):
- javac生成.class文件,比如javac NativePeer.java
- javah生成.h文件,比如javah NativePeer
- 編寫c語言文件,在其中include進上一步生成的.h文件,然後實現其中聲明而未實現的函數
- 生成dll共享庫,然後Java程序load庫,調用即可
native可以和任何除abstract外的關鍵字連用,這也説明了這些方法是有實體的,並且能夠和其他Java方法一樣,擁有各種Java的特性。
native方法有效地擴充了jvm,實際上我們所用的很多代碼已經涉及到這種方法了,通過非常簡潔的接口幫我們實現Java以外的工作。
native優勢:
- 很多層次上用Java去實現是很麻煩的,而且Java解釋執行的效率也差了c語言啥的很多,純Java實現可能會導致效率不達標,或者可讀性奇差。
- Java畢竟不是一個完整的系統,它經常需要一些底層的支持,通過JNI和native method我們就可以實現jre與底層的交互,得到強大的底層操作系統的支持,使用一些Java本身沒有封裝的操作系統的特性。
4. 多CPU的CAS處理
CAS可以保證一次的讀-改-寫操作是原子操作,在單處理器上該操作容易實現,但是在多處理器上實現就有點兒複雜了。CPU提供了兩種方法來實現多處理器的原子操作:總線加鎖或者緩存加鎖。
- 總線加鎖:總線加鎖就是就是使用處理器提供的一個LOCK#信號,當一個處理器在總線上輸出此信號時,其他處理器的請求將被阻塞住,那麼該處理器可以獨佔使用共享內存。但是這種處理方式顯得有點兒霸道,不厚道,他把CPU和內存之間的通信鎖住了,在鎖定期間,其他處理器都不能其他內存地址的數據,其開銷有點兒大。
- 緩存加鎖:其實針對於上面那種情況我們只需要保證在同一時刻對某個內存地址的操作是原子性的即可。緩存加鎖就是緩存在內存區域的數據如果在加鎖期間,當它執行鎖操作寫回內存時,處理器不在輸出LOCK#信號,而是修改內部的內存地址,利用緩存一致性協議來保證原子性。緩存一致性機制可以保證同一個內存區域的數據僅能被一個處理器修改,也就是説當CPU1修改緩存行中的i時使用緩存鎖定,那麼CPU2就不能同時緩存了i的緩存行。
5. CAS缺陷
CAS雖然高效地解決了原子操作,但是還是存在一些缺陷的,主要表現在三個方法:循環時間太長、只能保證一個共享變量原子操作、ABA問題。
- 循環時間太長
如果CAS一直不成功呢?這種情況絕對有可能發生,如果自旋CAS長時間地不成功,則會給CPU帶來非常大的開銷。在JUC中有些地方就限制了CAS自旋的次數,例如BlockingQueue的SynchronousQueue。 - 只能保證一個共享變量原子操作
看了CAS的實現就知道這隻能針對一個共享變量,如果是多個共享變量就只能使用鎖了。 - ABA問題
CAS需要檢查操作值有沒有發生改變,如果沒有發生改變則更新。但是存在這樣一種情況:如果一個值原來是A,變成了B,然後又變成了A,那麼在CAS檢查的時候會發現沒有改變,但是實質上它已經發生了改變,這就是所謂的ABA問題。對於ABA問題其解決方案是加上版本號,即在每個變量都加上一個版本號,每次改變時加1,即A —> B —> A,變成1A —> 2B —> 3A。
CAS的ABA隱患問題,Java提供了AtomicStampedReference來解決。AtomicStampedReference通過包裝[E,Integer]的元組來對對象標記版本戳stamp,從而避免ABA問題。對於上面的案例應該線程1會失敗。
下面我們將通過一個例子可以可以看到AtomicStampedReference和AtomicInteger的區別。我們定義兩個線程,線程1負責將100 —> 110 —> 100,線程2執行 100 —>120,看兩者之間的區別。
public class Demo4ABA {
private static AtomicInteger ai = new AtomicInteger(100);
private static AtomicStampedReference air = new AtomicStampedReference(100, 1);
//ABA問題演示:
//1. 線程1先對數據進行修改 A-B-A過程
//2. 線程2也對數據進行修改 A-C的過程
public static void main(String[] args) throws InterruptedException {
// AtomicInteger可以看到不會有任何限制隨便改
// 線程2修改的時候也不可能知道要A-C 的時候,A是原來的A還是修改之後的A
Thread at1 = new Thread(new Runnable() {
public void run() {
ai.compareAndSet(100, 110);
ai.compareAndSet(110, 100);
}
});
Thread at2 = new Thread(new Runnable() {
public void run() {
try {
//為了讓線程1先執行完,等一會
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("AtomicInteger:" + ai.compareAndSet(100, 120));
System.out.println("執行結果:" + ai.get());
}
});
at1.start();
at2.start();
//順序執行,AtomicInteger案例先執行
at1.join();
at2.join();
//AtomicStampedReference可以看到每次修改都需要設置標識Stamp,相當於進行了1A-2B-3A的操作
//線程2進行操作的時候,雖然數值都一樣,但是可以根據標識很容易的知道A是以前的1A,還是現在的3A
Thread tsf1 = new Thread(new Runnable() {
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 預期引用:100,更新後的引用:110,預期標識getStamp() 更新後的標識getStamp() + 1
air.compareAndSet(100, 110, air.getStamp(), air.getStamp() + 1);
air.compareAndSet(110, 100, air.getStamp(), air.getStamp() + 1);
}
});
Thread tsf2 = new Thread(new Runnable() {
public void run() {
//tsf2先獲取stamp,導致預期時間戳不一致
int stamp = air.getStamp();
try {
TimeUnit.MILLISECONDS.sleep(100); //線程tsf1執行完
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("AtomicStampedReference:" + air.compareAndSet(100, 120, stamp, stamp + 1));
int[] stampArr = {stamp + 1};
System.out.println("執行結果:" + air.get(stampArr));
}
});
tsf1.start();
tsf2.start();
}
運行結果充分展示了AtomicInteger的ABA問題和AtomicStampedReference解決ABA問題。
三、J.U.C之atomic包
1. atomic包介紹
通過前面CAS的學習,我們瞭解到AtomicInteger的工作原理,它們的內部都維護者一個對應的基本類型的成員變量value,這個變量是被volatile關鍵字修飾的,保證多線程環境下看見的是同一個(可見性)。
AtomicInteger在進行一些原子操作的時候,依賴Unsafe類裏面的CAS方法,原子操作就是通過自旋方式,不斷地使用CAS函數進行嘗試直到達到自己的目的。
除了AtomicInteger類以外還有很多其他的類也有類似的功能,在JUC中有一個包java.util.concurrent.atomic存放原子操作的類,atomic裏的類主要包括:
- 基本類型 使用原子的方式更新基本類型
AtomicInteger:整形原子類 AtomicLong:長整型原子類 AtomicBoolean :布爾型原子類
- 引用類型
AtomicReference:引用類型原子類 AtomicStampedReference:原子更新引用類型裏的字段原子類 AtomicMarkableReference :原子更新帶有標記位的引用類型
- 數組類型 使用原子的方式更新數組裏的某個元素
AtomicIntegerArray:整形數組原子類 AtomicLongArray:長整形數組原子類 AtomicReferenceArray :引用類型數組原子類
- 對象的屬性修改類型
AtomicIntegerFieldUpdater:原子更新整形字段的更新器 AtomicLongFieldUpdater:原子更新長整形字段的更新器 AtomicReferenceFieldUpdater :原子更新引用類形字段的更新器
- JDK1.8新增類
DoubleAdder:雙浮點型原子類 LongAdder:長整型原子類 DoubleAccumulator:類似DoubleAdder,但要更加靈活(要傳入一個函數式接口) LongAccumulator:類似LongAdder,但要更加靈活(要傳入一個函數式接口)
雖然涉及到的類很多,但是原理和AtomicInteger都是一樣,使用CAS進行的原子操作,其方法和使用都是大同小異的。
2. 基本類型
- 使用原子的方式更新基本類型
AtomicInteger:整形原子類 AtomicLong:長整型原子類 AtomicBoolean :布爾型原子類
AtomicInteger主要API如下:
get() //直接返回值
getAndAdd(int) //增加指定的數據,返回變化前的數據
getAndDecrement() //減少1,返回減少前的數據
getAndIncrement() //增加1,返回增加前的數據
getAndSet(int) //設置指定的數據,返回設置前的數據
addAndGet(int) //增加指定的數據後返回增加後的數據
decrementAndGet() //減少1,返回減少後的值
incrementAndGet() //增加1,返回增加後的值
lazySet(int) //僅僅當get時才會set
compareAndSet(int, int)//嘗試新增後對比,若增加成功則返回true否則返回false
AtomicLong主要API和AtomicInteger,只是類型不是int,而是long
AtomicBoolean主要API如下:
compareAndSet(boolean, boolean) //參數1為原始值,參數2為修改的新值,若修改成功返回true,否則返回false
getAndSet(boolean)// 嘗試設置新的boolean值,直到成功為止,返回設置前的數據
3. 數組類型
使用原子的方式更新數組裏的某個元素
AtomicIntegerArray:整形數組原子類 AtomicLongArray:長整形數組原子類 AtomicReferenceArray :引用類型數組原子類
AtomicIntegerArray主要API如下:
addAndGet(int, int)//執行加法,第一個參數為數組的下標,第二個參數為增加的數量,返回增加後的結果
compareAndSet(int, int, int)// 對比修改,參1數組下標,參2原始值,參3修改目標值,成功返回true否則false
decrementAndGet(int)// 參數為數組下標,將數組對應數字減少1,返回減少後的數據
incrementAndGet(int)// 參數為數組下標,將數組對應數字增加1,返回增加後的數據
getAndAdd(int, int)// 和addAndGet類似,區別是返回值是變化前的數據
getAndDecrement(int)// 和decrementAndGet類似,區別是返回變化前的數據
getAndIncrement(int)// 和incrementAndGet類似,區別是返回變化前的數據
getAndSet(int, int)// 將對應下標的數字設置為指定值,第二個參數為設置的值,返回是變化前的數據
AtomicIntegerArray主要API和AtomicLongArray,只是類型不是int,而是long
AtomicIntegerArray案例:
public class Demo7AtomicIntegerArray {
public static void main(String[] args) throws InterruptedException {
int[] arr = {1, 2, 3, 4, 5};
AtomicIntegerArray aia = new AtomicIntegerArray(arr);
aia.compareAndSet(1, 2, 200);
System.out.println(aia.toString());
}
}
AtomicReferenceArray 主要API:
//參數1:數組下標;
//參數2:修改原始值對比;
//參數3:修改目標值
//修改成功返回true,否則返回false
compareAndSet(int, Object, Object)
//參數1:數組下標
//參數2:修改的目標
//修改成功為止,返回修改前的數據
getAndSet(int, Object)
AtomicReferenceArray 案例:
public class Demo8AtomicReferenceArray {
public static void main(String[] args) throws InterruptedException {
User u1 = new User("張三", 22);
User u2 = new User("李四", 33);
User[] arr = {u1, u2};
AtomicReferenceArray<User> ara = new AtomicReferenceArray<User>(arr);
System.out.println(ara.toString());
User u3 = new User("王五", 44);
ara.compareAndSet(0, u1, u3);
System.out.println(ara.toString());
}
static class User {
private String name;
public volatile int age;
public User(String name, int age) {
super();
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
}
4. 引用類型
AtomicReference:引用類型原子類 AtomicStampedRefrence:原子更新引用類型裏的字段原子類 AtomicMarkableReference :原子更新帶有標記位的引用類型
AtomicReference引用類型和基本類型的作用基本一樣,例子如下:
public class Demo5AtomicReference {
public static void main(String[] args) throws InterruptedException {
User u1 = new User("張三", 22);
User u2 = new User("李四", 33);
AtomicReference ar = new AtomicReference(u1);
ar.compareAndSet(u1, u2);
System.out.println(ar.get());
}
static class User {
private String name;
public volatile int age;
public User(String name, int age) {
super();
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
}
AtomicStampedReference其實它僅僅是在AtomicReference類的再一次包裝,裏面增加了一層引用和計數器,其實是否為計數器完全由自己控制,大多數我們是讓他自增的,你也可以按照自己的方式來標示版本號。案例參考前面的ABA例子
AtomicMarkableReference和AtomicStampedReference功能差不多,區別的是:它描述更加簡單的是與否的關係。通常ABA問題只有兩種狀態,而AtomicStampedReference是多種狀態。
public class Demo6AtomicMrkableReference {
public static void main(String[] args) throws InterruptedException {
User u1 = new User("張三", 22);
User u2 = new User("李四", 33);
//和AtomicStampedReference效果一樣,用於解決ABA的
//區別是表示不是用的版本號,而只有true和false兩種狀態。相當於未修改和已修改
AtomicMarkableReference<User> amr = new AtomicMarkableReference(u1, true);
amr.compareAndSet(u1, u2, false, true);
System.out.println(amr.getReference());
}
static class User {
private String name;
public volatile int age;
public User(String name, int age) {
super();
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
}
5. 對象的屬性修改類型
如果需要原子更新某個類裏的某個字段時,需要用到對象的屬性修改類型原子類。
AtomicIntegerFieldUpdater:原子更新整形字段的更新器 AtomicLongFieldUpdater:原子更新長整形字段的更新器 AtomicReferenceFieldUpdater :原子更新引用類形字段的更新器
但是他們的使用通常有以下幾個限制:
- 限制1:操作的目標不能是static類型,前面説到的unsafe提取的是非static類型的屬性偏移量,如果是static類型在獲取時如果沒有使用對應的方法是會報錯的,而這個Updater並沒有使用對應的方法。
- 限制2:操作的目標不能是final類型的,因為final根本沒法修改。
- 限制3:必須是volatile類型的數據,也就是數據本身是讀一致的。
- 限制4:屬性必須對當前的Updater所在的區域是可見的,也就是private如果不是當前類肯定是不可見的,protected如果不存在父子關係也是不可見的,default如果不是在同一個package下也是不可見的。
實現方式:通過反射找到屬性,對屬性進行操作。
例子:
public class AtomicIntegerFieldUpdaterTest {
public static void main(String[] args) {
AtomicIntegerFieldUpdater<User> a = AtomicIntegerFieldUpdater.newUpdater(User.class, "age");
User user = new User("Java", 22);
System.out.println(a.get(user));
System.out.println(a.getAndAdd(user,10));
System.out.println(a.get(user));
}
}
class User {
private String name;
public volatile int age;
public User(String name, int age) {
super();
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
6. JDK1.8新增類
LongAdder:長整型原子類 DoubleAdder:雙浮點型原子類 LongAccumulator:類似LongAdder,但要更加靈活(要傳入一個函數式接口) DoubleAccumulator:類似DoubleAdder,但要更加靈活(要傳入一個函數式接口)
LongAdder是jdk1.8提供的累加器,基於Striped64實現,所提供的API基本上可以替換原先的AtomicLong。
LongAdder類似於AtomicLong是原子性遞增或者遞減類,AtomicLong已經通過CAS提供了非阻塞的原子性操作,相比使用阻塞算法的同步器來説性能已經很好了,但是JDK開發組並不滿足,因為在非常高的併發請求下AtomicLong的性能不能讓他們接受,雖然AtomicLong使用CAS但是CAS失敗後還是通過無限循環的自旋鎖不斷嘗試。
public final long incrementAndGet() {
for (;;) {
long current = get();
long next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
在高併發下N多線程同時去操作一個變量會造成大量線程CAS失敗然後處於自旋狀態,這大大浪費了cpu資源,降低了併發性。那麼既然AtomicLong性能由於過多線程同時去競爭一個變量的更新而降低的,那麼如果把一個變量分解為多個變量,讓同樣多的線程去競爭多個資源那麼性能問題不就解決了?是的,JDK8提供的LongAdder就是這個思路。下面通過圖形來標示兩者不同。
AtomicLong和LongAdder對比:
一段LongAdder和Atomic的對比測試代碼:
public class Demo9Compare {
public static void main(String[] args) {
AtomicLong atomicLong = new AtomicLong(0L);
LongAdder longAdder = new LongAdder();
long start = System.currentTimeMillis();
for (int i = 0; i < 50; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 1000000; j++) {
//atomicLong.incrementAndGet();
longAdder.increment();
}
}
}).start();
}
while (Thread.activeCount() > 2) {
}
System.out.println(atomicLong.get());
System.out.println(longAdder.longValue());
System.out.println("耗時:" + (System.currentTimeMillis() - start));
}
}
不同計算機因為CPU、內存等硬件不一樣,所以測試的數值也不一樣,但是得到的結論都是一樣的
測試結果:
從上結果圖可以看出,在併發比較低的時候,LongAdder和AtomicLong的效果非常接近。但是當併發較高時,兩者的差距會越來越大。上圖中在線程數為1000,每個線程循環數為100000時,LongAdder的效率是AtomicLong的6倍左右。
四、J.U.C之AQS
1. AQS簡介
AQS(AbstractQueuedSynchronizer),即隊列同步器。它是構建鎖或者其他同步組件的基礎框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),JUC併發包的作者(Doug Lea)期望它能夠成為實現大部分同步需求的基礎。它是JUC併發包中的核心基礎組件。
在這裏我們只是對AQS進行了解,它只是一個抽象類,但是JUC中的很多組件都是基於這個抽象類,也可以説這個AQS是多數JUC組件的基礎。
1.1 AQS的作用
Java的內置鎖一直都是備受爭議的,在JDK 1.6之前,synchronized這個重量級鎖其性能一直都是較為低下,雖然在1.6後,進行大量的鎖優化策略,但是與Lock相比synchronized還是存在一些缺陷的:它缺少了獲取鎖與釋放鎖的可操作性,可中斷、超時獲取鎖,而且獨佔式在高併發場景下性能大打折扣。
AQS解決了實現同步器時涉及到的大量細節問題,例如獲取同步狀態、FIFO同步隊列。基於AQS來構建同步器可以帶來很多好處。它不僅能夠極大地減少實現工作,而且也不必處理在多個位置上發生的競爭問題。
1.2 state狀態
AQS維護了一個volatile int類型的變量state表示當前同步狀態。當state>0時表示已經獲取了鎖,當state = 0時表示釋放了鎖。
它提供了三個方法來對同步狀態state進行操作:
getState():返回同步狀態的當前值 setState():設置當前同步狀態 compareAndSetState():使用CAS設置當前狀態,該方法能夠保證狀態設置的原子性
這三種操作均是CAS原子操作,其中compareAndSetState的實現依賴於Unsafe的compareAndSwapInt()方法
1.3 資源共享方式
AQS定義兩種資源共享方式:
- Exclusive(獨佔,只有一個線程能執行,如ReentrantLock)
- Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)
不同的自定義同步器爭用共享資源的方式也不同。自定義同步器在實現時只需要實現共享資源state的獲取與釋放方式即可,至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。自定義同步器實現時主要實現以下幾種方法:
- isHeldExclusively():當前同步器是否在獨佔式模式下被線程佔用,一般該方法表示是否被當前線程所獨佔。只有用到condition才需要去實現它。
- tryAcquire(int):獨佔方式。嘗試獲取同步狀態,成功則返回true,失敗則返回false。其他線程需要等待該線程釋放同步狀態才能獲取同步狀態。
- tryRelease(int):獨佔方式。嘗試釋放同步狀態,成功則返回true,失敗則返回false。
- tryAcquireShared(int):共享方式。嘗試獲取同步狀態。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
- tryReleaseShared(int):共享方式。嘗試釋放同步狀態,如果釋放後允許喚醒後續等待結點,返回true,否則返回false。
2. CLH同步隊列
AQS內部維護着一個FIFO隊列,該隊列就是CLH同步隊列,遵循FIFO原則( First Input First Output先進先出)。CLH同步隊列是一個FIFO雙向隊列,AQS依賴它來完成同步狀態的管理。
當前線程如果獲取同步狀態失敗時,AQS則會將當前線程已經等待狀態等信息構造成一個節點(Node)並將其加入到CLH同步隊列,同時會阻塞當前線程,當同步狀態釋放時,會把首節點喚醒(公平鎖),使其再次嘗試獲取同步狀態。
2.1 入列
CLH隊列入列非常簡單,就是tail指向新節點、新節點的prev指向當前最後的節點,當前最後一個節點的next指向當前節點。
代碼我們可以看看addWaiter(Node node)方法:
private Node addWaiter(Node mode) {
//新建Node
Node node = new Node(Thread.currentThread(), mode);
//快速嘗試添加尾節點
Node pred = tail;
if (pred != null) {
node.prev = pred;
//CAS設置尾節點
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//多次嘗試
enq(node);
return node;
}
在上面代碼中,兩個方法都是通過一個CAS方法compareAndSetTail(Node expect, Node update)來設置尾節點,該方法可以確保節點是線程安全添加的。在enq(Node node)方法中,AQS通過“死循環”的方式來保證節點可以正確添加,只有成功添加後,當前線程才會從該方法返回,否則會一直執行下去。
2.2 出列
CLH同步隊列遵循FIFO,首節點的線程釋放同步狀態後,將會喚醒它的後繼節點(next),而後繼節點將會在獲取同步狀態成功時將自己設置為首節點。head執行該節點並斷開原首節點的next和當前節點的prev即可,注意在這個過程是不需要使用CAS來保證的,因為只有一個線程能夠成功獲取到同步狀態。過程圖如下:
五、J.U.C之鎖
1. 鎖的基本概念
雖然在前面鎖優化的部分已經提到過一些鎖的概念,但不完全,這裏是對鎖的概念補充。
1.1 互斥鎖
在編程中,引入了對象互斥鎖的概念,來保證共享數據操作的完整性。每個對象都對應於一個可稱為" 互斥鎖" 的標記,這個標記用來保證在任一時刻,只能有一個線程訪問該對象。
1.2 阻塞鎖
阻塞鎖,可以説是讓線程進入阻塞狀態進行等待,當獲得相應的信號(喚醒,時間) 時,才可以進入線程的準備就緒狀態,準備就緒狀態的所有線程,通過競爭,進入運行狀態。
1.3 自旋鎖
自旋鎖是採用讓當前線程不停地的在循環體內執行實現的,當循環的條件被其他線程改變時,才能進入臨界區。
由於自旋鎖只是將當前線程不停地執行循環體,不進行線程狀態的改變,所以響應速度更快。但當線程數不停增加時,性能下降明顯,因為每個線程都需要執行,佔用CPU時間。如果線程競爭不激烈,並且保持鎖的時間段。適合使用自旋鎖。
1.4 讀寫鎖
讀寫鎖實際是一種特殊的自旋鎖,它把對共享資源的訪問者劃分成讀者和寫者,讀者只對共享資源進行讀訪問,寫者則需要對共享資源進行寫操作。
讀寫鎖相對於自旋鎖而言,能提高併發性,因為在多處理器系統中,它允許同時有多個讀者來訪問共享資源,最大可能的讀者數為實際的邏輯CPU數。寫者是排他性的,一個讀寫鎖同時只能有一個寫者或多個讀者(與CPU數相關),但不能同時既有讀者又有寫者。
1.5 公平鎖
公平鎖(Fair):加鎖前檢查是否有排隊等待的線程,優先排隊等待的線程,先來先得
非公平鎖(Nonfair):加鎖時不考慮排隊等待問題,直接嘗試獲取鎖,獲取不到自動到隊尾等待
非公平鎖性能比公平鎖高,因為公平鎖需要在多核的情況下維護一個隊列。
2. ReentrantLock
ReentrantLock,可重入鎖,是一種遞歸無阻塞的同步機制。它可以等同於synchronized的使用,但是ReentrantLock提供了比synchronized更強大、靈活的鎖機制,可以減少死鎖發生的概率。
ReentrantLock還提供了公平鎖和非公平鎖的選擇,構造方法接受一個可選的公平參數(默認非公平鎖),當設置為true時,表示公平鎖,否則為非公平鎖。公平鎖的效率往往沒有非公平鎖的效率高,在許多線程訪問的情況下,公平鎖表現出較低的吞吐量。
查看ReentrantLock源碼中的構造方法:
public ReentrantLock() {
//非公平鎖
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
//公平鎖
sync = fair ? new FairSync() : new NonfairSync();
}
Sync為ReentrantLock裏面的一個內部類,它繼承AQS(AbstractQueuedSynchronizer),它有兩個子類:公平鎖FairSync和非公平鎖NonfairSync。
2.1 獲取鎖
一般都是這麼使用ReentrantLock獲取鎖的:(默認非公平鎖)
//非公平鎖
ReentrantLock lock = new ReentrantLock();
lock.lock();
lock方法:
public void lock() {
sync.lock();
}
加鎖最終可以看到會調用方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
其實底層就是使用AQS同步隊列。
2.2 釋放鎖
獲取同步鎖後,使用完畢則需要釋放鎖,ReentrantLock提供了unlock釋放鎖:
public void unlock() {
sync.release(1);
}
unlock內部使用Sync的release()釋放鎖,release()是在AQS中定義的:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
釋放同步狀態的tryRelease()是同步組件自己實現:
protected final boolean tryRelease(int releases) {
//減掉releases
int c = getState() - releases;
//如果釋放的不是持有鎖的線程,拋出異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//state == 0 表示已經釋放完全了,其他線程可以獲取同步狀態了
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
只有當同步狀態徹底釋放後該方法才會返回true。當同步隊列的狀態state == 0 時,則將鎖持有線程設置為null,free= true,表示釋放成功。
2.3 公平鎖與非公平鎖原理
公平鎖與非公平鎖的區別在於獲取鎖的時候是否按照FIFO的順序來。釋放鎖不存在公平性和非公平性,比較非公平鎖和公平鎖獲取同步狀態的過程,會發現兩者唯一的區別就在於公平鎖在獲取同步狀態時多了一個限制條件:hasQueuedPredecessors(),定義如下:
public final boolean hasQueuedPredecessors() {
Node t = tail; //尾節點
Node h = head; //頭節點
Node s;
//頭節點 != 尾節點
//同步隊列第一個節點不為null
//當前線程是同步隊列第一個節點
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
該方法主要做一件事情:主要是判斷當前線程是否位於CLH同步隊列中的第一個。如果是則返回true,否則返回false。
2.4 ReentrantLock與synchronized的區別
前面提到ReentrantLock提供了比synchronized更加靈活和強大的鎖機制,那麼它的靈活和強大之處在哪裏呢?他們之間又有什麼相異之處呢?
1)與synchronized相比,ReentrantLock提供了更多,更加全面的功能,具備更強的擴展性。例如:時間鎖等候,可中斷鎖等候,鎖投票。
2)ReentrantLock還提供了條件Condition,對線程的等待、喚醒操作更加詳細和靈活,所以在多個條件變量和高度競爭鎖的地方,ReentrantLock更加適合(以後會闡述Condition)。
3)ReentrantLock提供了可輪詢的鎖請求。它會嘗試着去獲取鎖,如果成功則繼續,否則可以等到下次運行時處理,而synchronized則一旦進入鎖請求要麼成功要麼阻塞,所以相比synchronized而言,ReentrantLock會不容易產生死鎖些。
4)ReentrantLock支持更加靈活的同步代碼塊,但是使用synchronized時,只能在同一個synchronized塊結構中獲取和釋放。注:ReentrantLock的鎖釋放一定要在finally中處理,否則可能會產生嚴重的後果。
5)ReentrantLock支持中斷處理,且性能較synchronized會好些。
3. 讀寫鎖ReentrantReadWriteLock
可重入鎖ReentrantLock是互斥鎖,互斥鎖在同一時刻僅有一個線程可以進行訪問,但是在大多數場景下,大部分時間都是提供讀服務,而寫服務佔有的時間較少。然而讀服務不存在數據競爭問題,如果一個線程在讀時禁止其他線程讀勢必會導致性能降低。所以就提供了讀寫鎖。
讀寫鎖維護着一對鎖,一個讀鎖和一個寫鎖。通過分離讀鎖和寫鎖,使得併發性比一般的互斥鎖有了較大的提升:在同一時間可以允許多個讀線程同時訪問,但是在寫線程訪問時,所有讀線程和寫線程都會被阻塞。
讀寫鎖的主要特性:
- 公平性:支持公平性和非公平性。
- 重入性:支持重入。讀寫鎖最多支持65535個遞歸寫入鎖和65535個遞歸讀取鎖。
- 鎖降級:寫鎖能夠降級成為讀鎖,遵循獲取寫鎖、獲取讀鎖在釋放寫鎖的次序。讀鎖不能升級為寫鎖。
讀寫鎖ReentrantReadWriteLock實現接口ReadWriteLock,該接口維護了一對相關的鎖,一個用於只讀操作,另一個用於寫入操作。只要沒有 writer,讀取鎖可以由多個 reader 線程同時保持。寫入鎖是獨佔的。
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
ReadWriteLock定義了兩個方法。readLock()返回用於讀操作的鎖,writeLock()返回用於寫操作的鎖。ReentrantReadWriteLock定義如下:
/** 內部類 讀鎖 */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** 內部類 寫鎖 */
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
/** 使用默認(非公平)的排序屬性創建一個新的 ReentrantReadWriteLock */
public ReentrantReadWriteLock() {
this(false);
}
/** 使用給定的公平策略創建一個新的 ReentrantReadWriteLock */
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
/** 返回用於寫入操作的鎖 */
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
/** 返回用於讀取操作的鎖 */
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
abstract static class Sync extends AbstractQueuedSynchronizer {
//省略其餘源代碼
}
public static class WriteLock implements Lock, java.io.Serializable{
//省略其餘源代碼
}
public static class ReadLock implements Lock, java.io.Serializable {
//省略其餘源代碼
}
ReentrantReadWriteLock與ReentrantLock一樣,其鎖主體依然是Sync,它的讀鎖、寫鎖都是依靠Sync來實現的。所以ReentrantReadWriteLock實際上只有一個鎖,只是在獲取讀取鎖和寫入鎖的方式上不一樣而已,它的讀寫鎖其實就是兩個類:ReadLock、writeLock,這兩個類都是lock實現。
在ReentrantLock中使用一個int類型的state來表示同步狀態,該值表示鎖被一個線程重複獲取的次數。但是讀寫鎖ReentrantReadWriteLock內部維護着一對鎖,需要用一個變量維護多種狀態。所以讀寫鎖採用“按位切割使用”的方式來維護這個變量,將其切分為兩部分,高16為表示讀,低16為表示寫。分割之後,讀寫鎖是如何迅速確定讀鎖和寫鎖的狀態呢?通過位運算。假如當前同步狀態為S,那麼寫狀態等於 S & 0x0000FFFF(將高16位全部抹去),讀狀態等於S >>> 16(無符號補0右移16位)。代碼如下:
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
3.1 寫鎖的獲取
寫鎖就是一個支持可重入的互斥鎖。
寫鎖的獲取最終會調用tryAcquire(int arg),該方法在內部類Sync中實現:
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
//當前鎖個數
int c = getState();
//寫鎖
int w = exclusiveCount(c);
if (c != 0) {
//c != 0 && w == 0 表示存在讀鎖
//當前線程不是已經獲取寫鎖的線程
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//超出最大範圍
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
setState(c + acquires);
return true;
}
//是否需要阻塞
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
//設置獲取鎖的線程為當前線程
setExclusiveOwnerThread(current);
return true;
}
該方法和ReentrantLock的tryAcquire(int arg)大致一樣,在判斷重入時增加了一項條件:讀鎖是否存在。因為要確保寫鎖的操作對讀鎖是可見的,如果在存在讀鎖的情況下允許獲取寫鎖,那麼那些已經獲取讀鎖的其他線程可能就無法感知當前寫線程的操作。因此只有等讀鎖完全釋放後,寫鎖才能夠被當前線程所獲取,一旦寫鎖開始獲取了,所有其他讀、寫線程均會被阻塞。
3.2 寫鎖的釋放
獲取了寫鎖用完了則需要釋放,WriteLock提供了unlock()方法釋放寫鎖:
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
寫鎖的釋放最終還是會調用AQS的模板方法release(int arg)方法,該方法首先調用tryRelease(int arg)方法嘗試釋放鎖,tryRelease(int arg)方法為讀寫鎖內部類Sync中定義了,如下:
protected final boolean tryRelease(int releases) {
//釋放的線程不為鎖的持有者
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
//若寫鎖的新線程數為0,則將鎖的持有者設置為null
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
寫鎖釋放鎖的整個過程和互斥鎖ReentrantLock相似,每次釋放均是減少寫狀態,當寫狀態為0時表示 寫鎖已經完全釋放了,從而等待的其他線程可以繼續訪問讀寫鎖,獲取同步狀態,同時此次寫線程的修改對後續的線程可見。
3.3 讀鎖的獲取
讀鎖為一個可重入的共享鎖,它能夠被多個線程同時持有,在沒有其他寫線程訪問時,讀鎖總是獲取成功。
讀鎖的獲取可以通過ReadLock的lock()方法:
public void lock() {
sync.acquireShared(1);
}
Sync的acquireShared(int arg)定義在AQS中:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
3.4 讀鎖的釋放
與寫鎖相同,讀鎖也提供了unlock()釋放讀鎖:
public void unlock() {
sync.releaseShared(1);
}
unlcok()方法內部使用Sync的releaseShared(int arg)方法,該方法也定義AQS中:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
3.5 鎖降級
讀寫鎖有一個特性就是鎖降級,鎖降級就意味着寫鎖是可以降級為讀鎖的。鎖降級需要遵循以下順序:
獲取寫鎖=>獲取讀鎖=>釋放寫鎖
3.6 讀寫鎖例子
public class Demo10ReentrantReadWriteLock {
private static volatile int count = 0;
public static void main(String[] args) {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
WriteDemo writeDemo = new WriteDemo(lock);
ReadDemo readDemo = new ReadDemo(lock);
for (int i = 0; i < 3; i++) {
new Thread(writeDemo).start();
}
for (int i = 0; i < 5; i++) {
new Thread(readDemo).start();
}
}
static class WriteDemo implements Runnable {
ReentrantReadWriteLock lock;
public WriteDemo(ReentrantReadWriteLock lock) {
this.lock = lock;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.writeLock().lock();
count++;
System.out.println("寫鎖:"+count);
lock.writeLock().unlock();
}
}
}
static class ReadDemo implements Runnable {
ReentrantReadWriteLock lock;
public ReadDemo(ReentrantReadWriteLock lock) {
this.lock = lock;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.readLock().lock();
System.out.println("讀鎖:"+count);
lock.readLock().unlock();
}
}
}
}
六、J.U.C之Condition
1. Condition介紹
在沒有Lock之前,我們使用synchronized來控制同步,配合Object的wait()、notify()系列方法可以實現等待/通知模式。在JDK5後,Java提供了Lock接口,相對於Synchronized而言,Lock提供了條件Condition,對線程的等待、喚醒操作更加詳細和靈活。
下圖是Condition與Object的監視器方法的對比:
Condition提供了一系列的方法來對阻塞和喚醒線程:
- await() :造成當前線程在接到信號或被中斷之前一直處於等待狀態。
- await(long time, TimeUnit unit) :造成當前線程在接到信號、被中斷或到達指定等待時間之前一直處於等待狀態。
- awaitNanos(long nanosTimeout) :造成當前線程在接到信號、被中斷或到達指定等待時間之前一直處於等待狀態。返回值表示剩餘時間,如果在nanosTimesout之前喚醒,那麼返回值 = nanosTimeout – 消耗時間,如果返回值 <= 0 ,則可以認定它已經超時了。
- awaitUninterruptibly() :造成當前線程在接到信號之前一直處於等待狀態。【注意:該方法對中斷不敏感】。
- awaitUntil(Date deadline) :造成當前線程在接到信號、被中斷或到達指定最後期限之前一直處於等待狀態。如果沒有到指定時間就被通知,則返回true,否則表示到了指定時間,返回返回false。
- signal():喚醒一個等待線程。該線程從等待方法返回前必須獲得與Condition相關的鎖。
- signal()All:喚醒所有等待線程。能夠從等待方法返回的線程必須獲得與Condition相關的鎖。
Condition是一種廣義上的條件隊列(等待隊列)。他為線程提供了一種更為靈活的等待/通知模式,線程在調用await方法後執行掛起操作,直到線程等待的某個條件為真時才會被喚醒。Condition必須要配合鎖一起使用,因為對共享狀態變量的訪問發生在多線程環境下。一個Condition的實例必須與一個Lock綁定,因此Condition一般都是作為Lock的內部實現。
案例:
public class Demo11Condition {
private Lock reentrantLock = new ReentrantLock();
private Condition condition1 = reentrantLock.newCondition();
private Condition condition2 = reentrantLock.newCondition();
public void m1() {
reentrantLock.lock();
try {
System.out.println("線程 " + Thread.currentThread().getName() + " 已經進入執行等待。。。");
condition1.await();
System.out.println("線程 " + Thread.currentThread().getName() + " 已被喚醒,繼續執行。。。");
} catch (Exception e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
public void m2() {
reentrantLock.lock();
try {
System.out.println("線程 " + Thread.currentThread().getName() + " 已經進入執行等待。。。");
condition1.await();
System.out.println("線程 " + Thread.currentThread().getName() + " 已被喚醒,繼續執行。。。");
} catch (Exception e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
public void m3() {
reentrantLock.lock();
try {
System.out.println("線程 " + Thread.currentThread().getName() + " 已經進入執行等待。。。");
condition2.await();
System.out.println("線程 " + Thread.currentThread().getName() + " 已被喚醒,繼續執行。。。");
} catch (Exception e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
public void m4() {
reentrantLock.lock();
try {
System.out.println("線程 " + Thread.currentThread().getName() + " 已經進入發出condition1喚醒信號。。。");
condition1.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
public void m5() {
reentrantLock.lock();
try {
System.out.println("線程 " + Thread.currentThread().getName() + " 已經進入發出condition2喚醒信號。。。");
condition2.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
public static void main(String[] args) throws Exception {
final Demo11Condition useCondition = new Demo11Condition();
Thread t1 = new Thread(new Runnable() {
public void run() {
useCondition.m1();
}
}, "t1");
Thread t2 = new Thread(new Runnable() {
public void run() {
useCondition.m2();
}
}, "t2");
Thread t3 = new Thread(new Runnable() {
public void run() {
useCondition.m3();
}
}, "t3");
Thread t4 = new Thread(new Runnable() {
public void run() {
useCondition.m4();
}
}, "t4");
Thread t5 = new Thread(new Runnable() {
public void run() {
useCondition.m5();
}
}, "t5");
t1.start();
t2.start();
t3.start();
Thread.sleep(2000);
t4.start();
Thread.sleep(2000);
t5.start();
}
}
2. Condition的實現
獲取一個Condition必須通過Lock的newCondition()方法。該方法定義在接口Lock下面,返回的結果是綁定到此 Lock 實例的新 Condition 實例。Condition為一個接口,其下僅有一個實現類ConditionObject,由於Condition的操作需要獲取相關的鎖,而AQS則是同步鎖的實現基礎,所以ConditionObject則定義為AQS的內部類。定義如下:
public class ConditionObject implements Condition, java.io.Serializable {
}
2.1 等待隊列
每個Condition對象都包含着一個FIFO隊列,該隊列是Condition對象通知/等待功能的關鍵。在隊列中每一個節點都包含着一個線程引用,該線程就是在該Condition對象上等待的線程。源碼如下:
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
//頭節點
private transient Node firstWaiter;
//尾節點
private transient Node lastWaiter;
public ConditionObject() {
}
/** 省略方法 **/
}
從上面代碼可以看出Condition擁有首節點(firstWaiter),尾節點(lastWaiter)。當前線程調用await()方法,將會以當前線程構造成一個節點(Node),並將節點加入到該隊列的尾部。結構如下:
Node裏面包含了當前線程的引用。Node定義與AQS的CLH同步隊列的節點使用的都是同一個類(AbstractQueuedSynchronized.Node靜態內部類)。
Condition的隊列結構比CLH同步隊列的結構簡單些,新增過程較為簡單隻需要將原尾節點的nextWaiter指向新增節點,然後更新lastWaiter即可。
2.2 等待狀態
調用Condition的await()方法會使當前線程進入等待狀態,同時會加入到Condition等待隊列同時釋放鎖。當從await()方法返回時,當前線程一定是獲取了Condition相關連的鎖。
public final void await() throws InterruptedException {
// 當前線程中斷
if (Thread.interrupted())
throw new InterruptedException();
//當前線程加入等待隊列
Node node = addConditionWaiter();
//釋放鎖
long savedState = fullyRelease(node);
int interruptMode = 0;
/**
* 檢測此節點的線程是否在同步隊上,如果不在,則説明該線程還不具備競爭鎖的資格,則繼續等待
* 直到檢測到此節點在同步隊列上
*/
while (!isOnSyncQueue(node)) {
//線程掛起
LockSupport.park(this);
//如果已經中斷了,則退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//競爭同步狀態
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//清理下條件隊列中的不是在等待條件的節點
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
此段代碼的邏輯是:首先將當前線程新建一個節點同時加入到條件隊列中,然後釋放當前線程持有的同步狀態。然後則是不斷檢測該節點代表的線程釋放出現在CLH同步隊列中(收到signal信號之後就會在AQS隊列中檢測到),如果不存在則一直掛起,否則參與競爭同步狀態。
2.3 通知
調用Condition的signal()方法,將會喚醒在等待隊列中等待最長時間的節點(條件隊列裏的首節點),在喚醒節點前,會將節點移到CLH同步隊列中。
public final void signal() {
//檢測當前線程是否為擁有鎖的獨
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//頭節點,喚醒條件隊列中的第一個節點
Node first = firstWaiter;
if (first != null)
doSignal(first); //喚醒
}
該方法首先會判斷當前線程是否已經獲得了鎖,這是前置條件。然後喚醒等待隊列中的頭節點。
doSignal(Node first):喚醒頭節點
private void doSignal(Node first) {
do {
//修改頭結點,完成舊頭結點的移出工作
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
doSignal(Node first)主要是做兩件事:
1)修改頭節點
2)調用transferForSignal(Node first) 方法將節點移動到CLH同步隊列中。