博客 / 詳情

返回

ReentrantLock源碼解析 | 京東雲技術團隊

併發指同一時間內進行了多個線程。併發問題是多個線程對同一資源進行操作時產生的問題。通過加鎖可以解決併發問題,ReentrantLock是鎖的一種。

1 ReentrantLock

1.1 定義

ReentrantLock是Lock接口的實現類,可以手動的對某一段進行加鎖。ReentrantLock可重入鎖,具有可重入性,並且支持可中斷鎖。其內部對鎖的控制有兩種實現,一種為公平鎖,另一種為非公平鎖.

1.2 實現原理

ReentrantLock的實現原理為volatile+CAS。想要説明volatile和CAS首先要説明JMM。

1.2.1 JMM

JMM(java 內存模型 Java Memory Model 簡稱JMM) 本身是一個抽象的概念,並不在內存中真實存在的,它描述的是一組規範或者規則,通過這組規範定義了程序中各個變量的訪問方式.

由於 JMM 運行程序的實體是線程.而每個線程創建時JMM都會為其創建一個自己的工作內存(棧空間),工作內存是每個線程的私有 數據區域.而java內存模型中規定所有的變量都存儲在主內存中,主內存是共享內存區域,所有線程都可以訪問,但線程的變量的操作(讀取賦值等)必須在自己的工作內存中去進行,首先要 將變量從主存拷貝到自己的工作內存中,然後對變量進行操作,操作完成後再將變量操作完後的新值寫回主內存,不能直接操作主內存的變量,各個線程的工作內存中存儲着主內存的變量拷貝的副本,因不同的線程間無法訪問對方的工作內存,線程間的通信必須在主內存來完成。

如圖所示:線程A對變量A的操作,只能是從主內存中拷貝到線程中,再寫回到主內存中。

1.2.2 volatile

volatile 是JAVA的關鍵字用於修飾變量,是java虛擬機的輕量同步機制,volatile不能保證原子性。
作用:

  • 線程可見性:一個變量在某個線程裏修改了它的值,如果使用了volatile關鍵字,那麼別的線程可以馬上讀到修改後的值。
  • 指令重排序:沒加之前,指令是併發執行的,第一個線程執行到一半另一個線程可能開始執行了。加了volatile關鍵字後,不同線程是按照順序一步一步執行的。1.2.3 CASCAS是Compare and Swap,就是比較和交換,而比較和交換是一個原子操作。線程基於CAS修改數據的方式:先獲取主內存數據,在修改之前,先比較數據是否一致,如果一致修改主內存數據,如果不一致,放棄這次修改。

作用:CAS會使用現代處理器上提供的高效機器級別原子指令,這些原子指令以原子方式對內存執行讀-改-寫操作。1.2.4 AQSAQS的全稱是AbstractQueuedSynchronizer(抽象的隊列式的同步器),AQS定義了一套多線程訪問共享資源的同步器框架。

AQS主要包含兩部分內容:共享資源和等待隊列。AQS底層已經對這兩部分內容提供了很多方法。

  • 共享資源:共享資源是一個volatile的int類型變量。
  • 等待隊列:等待隊列是一個線程安全的隊列,當線程拿不到鎖時,會被park並放入隊列。

2 源碼解析

ReentrantLock在包java.util.concurrent.locks下,實現Lock接口。

2.1 lock方法

lock分為公平鎖和非公平鎖。

公平鎖:

final void lock() {
    acquire(1);
}

非公平鎖:上來先嚐試將state從0修改為1,如果成功,代表獲取鎖資源。如果沒有成功,調用acquire。state是AQS中的一個由volatile修飾的int類型變量,多個線程會通過CAS的方式修改state,在併發情況下,只會有一個線程成功的修改state。

final void lock() {
//通過原子方式修改值
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}


 /**
 * 獲取鎖的線程.
 */
 private transient Thread exclusiveOwnerThread;


  /**
  * 設置擁有鎖的線程
  */
  protected final void setExclusiveOwnerThread(Thread thread) {
      exclusiveOwnerThread = thread;

2.2 acquire方法

acquire是一個業務方法,裏面並沒有實際的業務處理,都是在調用其他方法。

public final void acquire(int arg) {
    //調用tryAcquire方法:嘗試獲取鎖資源(非公平、公平),拿到鎖資源,返回true,直接結束方法
    if (!tryAcquire(arg) &&
    //當沒有獲取鎖資源後,會先調用addWaiter:會將沒有獲取到鎖資源的線程封裝為Node對象,
    //並且插入到AQS的隊列的末尾.
   //繼續調用acquireQueued方法,查看當前排隊的Node是否在隊列的前面,如果在前面,嘗試獲取鎖資源
    //如果沒在前面,嘗試將線程掛起,阻塞起來!
     acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
     selfInterrupt();
}

2.3 tryAcquire方法

tryAcquire分為公平和非公平兩種。

公平:

 protected final boolean tryAcquire(int acquires) {
   //拿到當前線程
    final Thread current = Thread.currentThread();
   //拿到AQS的state
     int c = getState();
   // 如果state == 0,説明沒有線程佔用着當前的鎖資源
     if (c == 0) {
   //如果沒有線程排隊,直接直接CAS嘗試獲取鎖資源
      if (!hasQueuedPredecessors() &&
      compareAndSetState(0, acquires)) {
     //如果獲取資源成功,將當前線程設置為持有鎖資源的線程
       setExclusiveOwnerThread(current);
        return true;
          }
        }
     //如果有線程持有鎖資源,判斷持有鎖資源的線程是否是當前線程
         else if (current == getExclusiveOwnerThread()) {
      //增加AQS的state的值
            int nextc = c + acquires;
             if (nextc < 0)
             throw new Error("Maximum lock count exceeded");
              setState(nextc);
               return true;
            }
          return false;
        }
    }

非公平:

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
   //拿到當前線程
    final Thread current = Thread.currentThread();
   //拿到AQS的state
     int c = getState();
   // 如果state == 0,説明沒有線程佔用着當前的鎖資源
      if (c == 0) {
   //獲取鎖資源
      if (compareAndSetState(0, acquires)) {
   //將當前佔用這個互斥鎖的線程屬性設置為當前線程
      setExclusiveOwnerThread(current);
       return true;
       }
       }
   //如果有線程持有鎖資源,判斷持有鎖資源的線程是否是當前線程
       else if (current == getExclusiveOwnerThread()) {
          int nextc = c + acquires;
          if (nextc < 0) // overflow
          throw new Error("Maximum lock count exceeded");
           setState(nextc);
           return true;
          }
         return false;
  }

2.4 addWaiter方法

在獲取鎖資源失敗後,需要將當前線程封裝為Node對象,並且插入到AQS隊列的末尾。

private Node addWaiter(Node mode) {
    // 將當前線程封裝為Node對象,mode為null,代表互斥鎖
    Node node = new Node(Thread.currentThread(), mode);
    // pred是tail節點
    Node pred = tail;
    // 如果pred不為null,有線程正在排隊
    if (pred != null) {
        // 將當前節點的prev,指定tail尾節點
        node.prev = pred;
        // 以CAS的方式,將當前節點變為tail節點
        if (compareAndSetTail(pred, node)) {
            // 之前的tail的next指向當前節點
            pred.next = node;
            return node;
        }
    }
    // 添加的流程為,  自己prev指向、tail指向自己、前節點next指向我
    // 如果上述方式,CAS操作失敗,導致加入到AQS末尾失敗,如果失敗,就基於enq的方式添加到AQS隊列
    enq(node);
    return node;
}
// enq,無論怎樣都添加進入
private Node enq(final Node node) {
    for (;;) {
        // 拿到tail
        Node t = tail;
        // 如果tail為null,説明當前沒有Node在隊列中
        if (t == null) { 
            // 創建一個新的Node作為head,並且將tail和head指向一個Node
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 和上述代碼一致!
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

2.5 acquireQueued方法

// acquireQueued方法
// 查看當前排隊的Node是否是head的next,
// 如果是,嘗試獲取鎖資源,
// 如果不是或者獲取鎖資源失敗那麼就嘗試將當前Node的線程掛起(unsafe.park())
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        for (;;) {
            // 拿到上一個節點
            final Node p = node.predecessor();
            if (p == head && // 説明當前節點是head的next
                tryAcquire(arg)) { // 競爭鎖資源,成功:true,失敗:false
                // 進來説明拿到鎖資源成功
                // 將當前節點置位head,thread和prev屬性置位null
                setHead(node);
                // 幫助快速GC
                p.next = null; 
                // 設置獲取鎖資源成功
                failed = false;
                // 不管線程中斷。
                return interrupted;
            }
            // 如果不是或者獲取鎖資源失敗,嘗試將線程掛起
            // 第一個事情,當前節點的上一個節點的狀態正常!
            // 第二個事情,掛起線程
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 通過LockSupport將當前線程掛起
                parkAndCheckInterrupt())
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


// 確保上一個節點狀態是正確的
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 拿到上一個節點的狀態
    int ws = pred.waitStatus;
    // 如果上一個節點為 -1
    if (ws == Node.SIGNAL)
        // 返回true,掛起線程
        return true;
    // 如果上一個節點是取消狀態
    if (ws > 0) {
        // 循環往前找,找到一個狀態小於等於0的節點
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 將小於等於0的節點狀態該為-1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

2.6 unlock方法

釋放鎖資源,將state減1,如果state減為0了,喚醒在隊列中排隊的Node。

public final boolean release(int arg) {
    // 核心的釋放鎖資源方法
    if (tryRelease(arg)) {
        // 釋放鎖資源釋放乾淨了。  (state == 0)
        Node h = head;
        // 如果頭節點不為null,並且頭節點的狀態不為0,喚醒排隊的線程
        if (h != null && h.waitStatus != 0)
            // 喚醒線程
            unparkSuccessor(h);
        return true;
    }
    // 釋放鎖成功,但是state != 0
    return false;
}
// 核心的釋放鎖資源方法
protected final boolean tryRelease(int releases) {
    // 獲取state - 1
    int c = getState() - releases;
    // 如果釋放鎖的線程不是佔用鎖的線程,拋異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    // 是否成功的將鎖資源釋放利索 (state == 0)
    boolean free = false;
    if (c == 0) {
        // 鎖資源釋放乾淨。
        free = true;
        // 將佔用鎖資源的屬性設置為null
        setExclusiveOwnerThread(null);
    }
    // 將state賦值
    setState(c);
    // 返回true,代表釋放乾淨了
    return free;
}


// 喚醒節點
private void unparkSuccessor(Node node) {
    // 拿到頭節點狀態
    int ws = node.waitStatus;
    // 如果頭節點狀態小於0,換為0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 拿到當前節點的next
    Node s = node.next;
    // 如果s == null ,或者s的狀態為1
    if (s == null || s.waitStatus > 0) {
        // next節點不需要喚醒,需要喚醒next的next
        s = null;
        // 從尾部往前找,找到狀態正常的節點。(小於等於0代表正常狀態)
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 經過循環的獲取,如果拿到狀態正常的節點,並且不為null
    if (s != null)
        // 喚醒線程
        LockSupport.unpark(s.thread);
}

3 使用實例

3.1 公平鎖

1.代碼:

public class ReentrantLockTest {


    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock(true);
        new Thread(()->test(lock),"線程A").start();
        new Thread(()->test(lock),"線程B").start();
        new Thread(()->test(lock),"線程C").start();
    }
    public static   void test(ReentrantLock lock){
        for (int i = 0; i < 3;i++){
            try {
                lock.lock();
                System.out.println(Thread.currentThread().getName()+"獲取了鎖!");
                Thread.sleep(200);
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        }
    }
}

2.執行結果:

3.小結:

公平鎖可以保證每個線程獲取鎖的機會是相等的。

3.2 非公平鎖

1.代碼:

public class ReentrantLockTest {


    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        new Thread(()->test(lock),"線程A").start();
        new Thread(()->test(lock),"線程B").start();
        new Thread(()->test(lock),"線程C").start();
    }
    public static   void test(ReentrantLock lock){
        for (int i = 0; i < 3;i++){
            try {
                lock.lock();
                System.out.println(Thread.currentThread().getName()+"獲取了鎖!");
                Thread.sleep(200);
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        }
    }
}

2.執行結果:

3.小結:

非公平鎖每個線程獲取鎖的機會是隨機的。

3.3 忽略重複操作

1.代碼:

public class ReentrantLockTest {
    private ReentrantLock lock = new ReentrantLock();


    public void doSomething(){
        if(lock.tryLock()){
            try {
                System.out.println(Thread.currentThread().getName()+"獲取了鎖!");
                Thread.sleep(5);
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        }
    }
    public static void main(String[] args) throws Exception {
        ReentrantLockTest test = new ReentrantLockTest();
        for (int i = 0; i < 10;i++){
            new Thread(()->{test.doSomething();},"線程"+i).start();
            Thread.sleep(1);
        }
    }
}

2.執行結果:

3.小結:

當線程持有鎖時,不會重複執行,可以用來防止定時任務重複執行或者頁面事件多次觸發時不會重複觸發。

3.4 超時不執行

1.代碼:

public class ReentrantLockTest {


    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        new Thread(()->test(lock),"線程A").start();
        new Thread(()->test(lock),"線程B").start();
    }
    public static   void test(ReentrantLock lock){
            try {
                if(lock.tryLock(2, TimeUnit.SECONDS)){
                    try {
                        System.out.println(Thread.currentThread().getName()+"獲取了鎖!");
                        Thread.sleep(3000);
                    }finally {
                        lock.unlock();
                    }
                }
            }catch (Exception e){
                e.printStackTrace();
            }


    }

2.執行結果:

3.小結:

超時不執行可以防止由於資源處理不當長時間佔用資源產生的死鎖問題。

4 總結

併發是現在軟件系統不可避免的問題,ReentrantLock是可重入的獨佔鎖,比起synchronized功能更加豐富,支持公平鎖實現,支持中斷響應以及限時等待等,是處理併發問題很好的解決方案。

作者:京東物流 陳昌浩

來源:京東雲開發者社區

user avatar tanking 頭像
1 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.