condition 介紹及demo
Condition是在java 1.5中才出現的,它用來替代傳統的Object的wait()、notify()實現線程間的協作,相比使用Object的wait()、notify(),使用Condition的await()、signal()這種方式實現線程間協作更加安全和高效。因此通常來説比較推薦使用Condition,阻塞隊列實際上是使用了Condition來模擬線程間協作。
- Condition是個接口,基本的方法就是await()和signal()方法;
- Condition依賴於Lock接口,生成一個Condition的基本代碼是lock.newCondition()
- 調用Condition的await()和signal()方法,都必須在lock保護之內,就是説必須在lock.lock()和lock.unlock之間才可以使用
Conditon中的await()對應Object的wait();
Condition中的signal()對應Object的notify();
Condition中的signalAll()對應Object的notifyAll()。
下面是demo:
[java] view plain copy
1. package thread;
2.
3. import java.util.concurrent.locks.Condition;
4. import java.util.concurrent.locks.Lock;
5. import java.util.concurrent.locks.ReentrantLock;
6. /**
7. *
8. * @author zhangliang
9. *
10. * 2016年4月8日 下午5:48:54
11. */
12. public class ConTest {
13.
14. final Lock lock = new ReentrantLock();
15. final Condition condition = lock.newCondition();
16.
17. public static void main(String[] args) {
18. // TODO Auto-generated method stub
19. new ConTest();
20. new Producer();
21. new Consumer();
22.
23.
24. consumer.start();
25. producer.start();
26. }
27.
28. class Consumer extends Thread{
29.
30. @Override
31. public void run() {
32. consume();
33. }
34.
35. private void consume() {
36.
37. try {
38. lock.lock();
39. "我在等一個新信號"+this.currentThread().getName());
40. condition.await();
41.
42. catch (InterruptedException e) {
43. // TODO Auto-generated catch block
44. e.printStackTrace();
45. finally{
46. "拿到一個信號"+this.currentThread().getName());
47. lock.unlock();
48. }
49.
50. }
51. }
52.
53. class Producer extends Thread{
54.
55. @Override
56. public void run() {
57. produce();
58. }
59.
60. private void produce() {
61. try {
62. lock.lock();
63. "我拿到鎖"+this.currentThread().getName());
64. condition.signalAll();
65. "我發出了一個信號:"+this.currentThread().getName());
66. finally{
67. lock.unlock();
68. }
69. }
70. }
71.
72. }
運行結果:
Condition的執行方式,是當在線程Consumer中調用await方法後,線程Consumer將釋放鎖,並且將自己沉睡,等待喚醒,線程Producer獲取到鎖後,開始做事,完畢後,調用Condition的signalall方法,喚醒線程Consumer,線程Consumer恢復執行。
以上説明Condition是一個多線程間協調通信的工具類,使得某個,或者某些線程一起等待某個條件(Condition),只有當該條件具備( signal 或者 signalAll方法被帶調用)時 ,這些等待線程才會被喚醒,從而重新爭奪鎖。
Condition實現生產者、消費者模式:
[java] view plain copy
1. package thread;
2.
3. import java.util.PriorityQueue;
4. import java.util.concurrent.locks.Condition;
5. import java.util.concurrent.locks.Lock;
6. import java.util.concurrent.locks.ReentrantLock;
7.
8. public class ConTest2 {
9. private int queueSize = 10;
10. private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
11. private Lock lock = new ReentrantLock();
12. private Condition notFull = lock.newCondition();
13. private Condition notEmpty = lock.newCondition();
14.
15. public static void main(String[] args) throws InterruptedException {
16. new ConTest2();
17. new Producer();
18. new Consumer();
19. producer.start();
20. consumer.start();
21. 0);
22. producer.interrupt();
23. consumer.interrupt();
24. }
25.
26. class Consumer extends Thread{
27. @Override
28. public void run() {
29. consume();
30. }
31. volatile boolean flag=true;
32. private void consume() {
33. while(flag){
34. lock.lock();
35. try {
36. while(queue.isEmpty()){
37. try {
38. "隊列空,等待數據");
39. notEmpty.await();
40. catch (InterruptedException e) {
41. false;
42. }
43. }
44. //每次移走隊首元素
45. notFull.signal();
46. "從隊列取走一個元素,隊列剩餘"+queue.size()+"個元素");
47. finally{
48. lock.unlock();
49. }
50. }
51. }
52. }
53.
54. class Producer extends Thread{
55. @Override
56. public void run() {
57. produce();
58. }
59. volatile boolean flag=true;
60. private void produce() {
61. while(flag){
62. lock.lock();
63. try {
64. while(queue.size() == queueSize){
65. try {
66. "隊列滿,等待有空餘空間");
67. notFull.await();
68. catch (InterruptedException e) {
69.
70. false;
71. }
72. }
73. 1); //每次插入一個元素
74. notEmpty.signal();
75. "向隊列取中插入一個元素,隊列剩餘空間:"+(queueSize-queue.size()));
76. finally{
77. lock.unlock();
78. }
79. }
80. }
81. }
82. }
運行結果如下:
condition實現分析:
- Condition接口包含了多種await方式和兩個通知方法
- ConditionObject實現了Condition接口,是AbstractQueuedSynchronizer的內部類
- Reentrantlock的newCondition方法返回與某個lock實例相關的Condition對象
[java] view plain copy
1. public abstract class AbstractQueuedLongSynchronizer
2. extends AbstractOwnableSynchronizer
3. implements java.io.Serializable {
[java] view plain copy
1. <span style="font-size:18px;">結合上面的類圖,我們看到condition實現是依賴於aqs,而aqs是個抽象類。裏面定義了同步器的基本框架,實現了基本的結構功能。只留有狀態條件的維護由具體同步器根據具體場景來定製,如常見的 ReentrantLock 、 RetrantReadWriteLock和CountDownLatch 等等,AQS內容太多,儘量只簡明梳理condition相關流程,不太深入理解底層源碼。</span>
下面結合上面demo來分析流程。
reentrantLock.newCondition() 返回的是Condition的一個實現,該類在AbstractQueuedSynchronizer(AQS)中被實現,叫做newCondition()
[java] view plain copy
1. public Condition newCondition() {
2. return sync.newCondition();
3. }
我們看一下這個await的方法,它是AQS的方法,
publicfinalvoid await() throws InterruptedException {
02
if (Thread.interrupted())
03
thrownew InterruptedException();
04
Node node = addConditionWaiter(); //將當前線程包裝下後,
05
//添加到Condition自己維護的一個鏈表中。
06
int savedState = fullyRelease(node);//釋放當前線程佔有的鎖,從demo中看到,
07
//調用await前,當前線程是佔有鎖的
08
09
int interruptMode = 0;
10
while (!isOnSyncQueue(node)) {//釋放完畢後,遍歷AQS的隊列,看當前節點是否在隊列中,
11
//不在 説明它還沒有競爭鎖的資格,所以繼續將自己沉睡。
12
//直到它被加入到隊列中,聰明的你可能猜到了,
13
//沒有錯,在singal的時候加入不就可以了?
14
LockSupport.park(this);
15
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
16
break;
17
}
18
//被喚醒後,重新開始正式競爭鎖,同樣,如果競爭不到還是會將自己沉睡,等待喚醒重新開始競爭。
19
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
20
interruptMode = REINTERRUPT;
21
if (node.nextWaiter != null)
22
unlinkCancelledWaiters();
23
if (interruptMode != 0)
24
reportInterruptAfterWait(interruptMode);
25
}
回到上面的demo,鎖被釋放後,線程Consumer開始沉睡,這個時候線程因為線程Consumer沉睡時,會喚醒AQS隊列中的頭結點,所所以線程Producer會開始競爭鎖,並獲取到,執行完後線程Producer會調用signal方法,“發出”signal信號,signal方法如下:
1
publicfinalvoid signal() {
2
if (!isHeldExclusively())
3
thrownew IllegalMonitorStateException();
4
Node first = firstWaiter; //firstWaiter為condition自己維護的一個鏈表的頭結點,
5
//取出第一個節點後開始喚醒操作
6
if (first != null)
7
doSignal(first);
8
}
説明下,其實Condition內部維護了等待隊列的頭結點和尾節點,該隊列的作用是存放等待signal信號的線程,該線程被封裝為Node節點後存放於此。
而Condition自己也維護了一個隊列,該隊列的作用是維護一個等待signal信號的隊列,兩個隊列的作用是不同,事實上,每個線程也僅僅會同時存在以上兩個隊列中的一個,流程是這樣的:
注意:
1.線程producer調用signal方法,這個時候Condition的等待隊列中只有線程Consumer一個節點,於是它被取出來,並被加入到AQS的等待隊列中。 注意,這個時候,線程Consumer 並沒有被喚醒。
2.Sync是AQS的抽象子類,實現可重入和互斥的大部分功能。在Sync的子類中有FairSync和NonfairSync兩種代表公平鎖策略和非公平鎖策略。Sync lock方法留給子類去實現,NonfairSync的實現:
[java] view plain copy
1. final void lock() {
2. if (compareAndSetState(0, 1))
3. setExclusiveOwnerThread(Thread.currentThread());
4. else
5. 1);
6. }
其中如果一開始獲取鎖成功,是直接設置當前線程。
否則執行acquire(1),也就是進入aqs等待隊列。這裏不展開細節。
可以這樣理解,整個協作過程是靠結點在AQS的等待隊列和Condition的等待隊列中來回移動實現的,每個隊列的意義不同,Condition作為一個條件類,很好的自己維護了一個等待信號的隊列,並在適時的時候將結點加入到AQS的等待隊列中來實現的喚醒操作
本文先整理到這裏吧。
後記:
梳理本文的過程比較痛苦,為什麼呢?因為我沒有吃透這一塊,發現牽扯的很多,腦子很亂,有廣度又有深度,感覺沒法梳理,決定一點一點去啃,從淺入深的去梳理,從鎖,同步,阻塞隊列,併發容器開始,到依賴的底層aqs\原子變量,再到更底層的volatile、cas。其中aqs是其中的關鍵,很多j.u.c的包是圍繞它實現的。目標就是會用,熟悉原理,讀懂源碼,寫出demo,關鍵地方梳理出流程圖,加油!