博客 / 詳情

返回

ZooKeeper實現分佈式鎖

基礎

ZooKeeper的4個節點

  • 持久節點:默認的節點類型,一直存在於ZooKeeper中
  • 持久順序節點:在創建節點時,ZooKeeper根據節點創建的時間順序對節點進行編號
  • 臨時節點:當客户端與ZooKeeper斷開連接後,該進程創建的臨時節點就會被刪除
  • 臨時順序節點:按時間順序編號的臨時節點

ZK分佈式鎖相關基礎知識

  • zk分佈式鎖一般由多個節點構成(單數),採用 zab 一致性協議。因此可以將 zk 看成一個單點結構,對其修改數據其內部自動將所有節點數據進行修改而後才提供查詢服務。
  • zk 的數據以目錄樹的形式,每個目錄稱為 znode, znode 中可存儲數據(一般不超過 1M),還可以在其中增加子節點。
  • 子節點有三種類型。序列化節點,每在該節點下增加一個節點自動給該節點的名稱上自增。臨時節點,一旦創建這個 znode 的客户端與服務器失去聯繫,這個 znode 也將自動刪除。最後就是普通節點。
  • Watch 機制,client 可以監控每個節點的變化,當產生變化會給 client 產生一個事件。

實現原理

核心思想

核心思想就是基於 臨時順序節點 和 Watcher(事件監聽器) 實現的。

當客户端要獲取鎖,則創建節點,使用完鎖,則刪除該節點。

  1. 客户端獲取鎖時,在lock節點下創建臨時順序節點。

    1. 臨時是防止客户端宕機後,無法正常刪除鎖的情況
    2. 使用順序節點,是因為所有嘗試獲取鎖的客户端都會對持有鎖的子節點加監聽器。當該鎖被釋放之後,勢必會造成所有嘗試獲取鎖的客户端來爭奪鎖,這樣對性能不友好。使用順序節點之後,只需要監聽前一個節點就好了,對性能更友好
  2. 然後獲取lock下面的所有子節點,客户端獲取到所有的子節點之後,如果發現自己創建的子節點序號最小,那麼就認為該客户端獲取到了鎖。使用完鎖後,將該節點刪除。
  3. 如果發現自己創建的節點並非lock所有子節點中最小的,説明自己還沒有獲取到鎖,此時客户端需要找到比自己小的那個節點,同時對其註冊事件監聽器,監聽刪除事件。
  4. 如果發現比自己小的那個節點被刪除,則客户端的Watcher會收到相應通知,此時再次判斷自己創建的節點是否是lock子節點中序號最小的,如果是則獲取到了鎖,如果不是則重複以上步驟繼續獲取到比自己小的一個節點並註冊監聽。

獲取鎖步驟:

  1. 在 /lock 節點下創建一個有序臨時節點 (EPHEMERAL_SEQUENTIAL)。
  2. 判斷創建的節點序號是否最小,如果是最小則獲取鎖成功。不是則取鎖失敗,然後 watch 序號比本身小的前一個節點。
  3. 當取鎖失敗,設置 watch 後則等待 watch 事件到來後,再次判斷是否序號最小。
  4. 取鎖成功則執行代碼,最後釋放鎖(刪除該節點)

釋放鎖步驟:

  1. 成功獲取鎖的客户端在執行完業務流程之後,會將對應的子節點刪除。
  2. 成功獲取鎖的客户端在出現故障之後,對應的子節點由於是臨時順序節點,也會被自動刪除,避免了鎖無法被釋放。
  3. 事件監聽器其實監聽的就是這個子節點刪除事件,子節點刪除就意味着鎖被釋放。

羊羣效應和解決方法

  • 羊羣效應

    • 在整個分佈式鎖的競爭過程中,大量的「Watcher通知」和「子節點列表的獲取」操作重複運行,並且大多數節點的運行結果都是判斷出自己當前並不是編號最小的節點,繼續等待下一次通知,而不是執行業務邏輯
    • 這就會對 ZooKeeper 服務器造成巨大的性能影響和網絡衝擊。更甚的是,如果同一時間多個節點對應的客户端完成事務或事務中斷引起節點消失,ZooKeeper 服務器就會在短時間內向其他客户端發送大量的事件通知
  • 解決方法

    • 在與該方法對應的持久節點的目錄下,為每個進程創建一個臨時順序節點
    • 每個進程獲取所有臨時節點列表,對比自己的編號是否最小,若最小,則獲得鎖。
    • 若本進程對應的臨時節點編號不是最小的,則繼續判斷

      • 若本進程為讀請求,則向比自己序號小的最後一個寫請求節點註冊watch監聽,當監聽到該節點釋放鎖後,則獲取鎖
      • 若本進程為寫請求,則向比自己序號小的最後一個讀請求節點註冊watch監聽,當監聽到該節點釋放鎖後,獲取鎖

實現

實際項目中,推薦使用 Curator 來實現 ZooKeeper 分佈式鎖。Curator 是 Netflix 公司開源的一套 ZooKeeper Java 客户端框架,相比於 ZooKeeper 自帶的客户端 zookeeper 來説,Curator 的封裝更加完善,各種 API 都可以比較方便地使用。

原生API實現

/**
 * 自己本身就是一個 watcher,可以得到通知
 * AutoCloseable 實現自動關閉,資源不使用的時候
 */
@Slf4j
public class ZkLock implements AutoCloseable, Watcher {

    private ZooKeeper zooKeeper;

    /**
     * 記錄當前鎖的名字
     */
    private String znode;

    public ZkLock() throws IOException {
        this.zooKeeper = new ZooKeeper("localhost:2181",
                10000,this);
    }

    public boolean getLock(String businessCode) {
        try {
            //創建業務 根節點
            Stat stat = zooKeeper.exists("/" + businessCode, false);
            if (stat==null){
                zooKeeper.create("/" + businessCode,businessCode.getBytes(),
                        ZooDefs.Ids.OPEN_ACL_UNSAFE,
                        CreateMode.PERSISTENT);
            }

            //創建瞬時有序節點  /order/order_00000001
            znode = zooKeeper.create("/" + businessCode + "/" + businessCode + "_", businessCode.getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);

            //獲取業務節點下 所有的子節點
            List<String> childrenNodes = zooKeeper.getChildren("/" + businessCode, false);
            //獲取序號最小的(第一個)子節點
            Collections.sort(childrenNodes);
            String firstNode = childrenNodes.get(0);
            //如果創建的節點是第一個子節點,則獲得鎖
            if (znode.endsWith(firstNode)){
                return true;
            }
            //如果不是第一個子節點,則監聽前一個節點
            String lastNode = firstNode;
            for (String node:childrenNodes){
                if (znode.endsWith(node)){
                    zooKeeper.exists("/"+businessCode+"/"+lastNode,true);
                    break;
                }else {
                    lastNode = node;
                }
            }
            synchronized (this){
                wait();
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    @Override
    public void close() throws Exception {
        zooKeeper.delete(znode,-1);
        zooKeeper.close();
        log.info("我已經釋放了鎖!");
    }

    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == Event.EventType.NodeDeleted){
            synchronized (this){
                notify();
            }
        }
    }
}

Curator實現

Curator有五種鎖:

  • InterProcessSemaphoreMutex:分佈式排它鎖(非可重入鎖)
  • InterProcessMutex:分佈式可重入排它鎖
  • InterProcessReadWriteLock:分佈式讀寫鎖
  • InterProcessMultiLock:將多個鎖作為單個實體管理的容器
  • InterProcessSemaphoreV2:共享信號量
CuratorFramework client = ZKUtils.getClient();
client.start();
// 分佈式可重入排它鎖
InterProcessLock lock1 = new InterProcessMutex(client, lockPath1);
// 分佈式不可重入排它鎖
InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, lockPath2);
// 將多個鎖作為一個整體
InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));

if (!lock.acquire(10, TimeUnit.SECONDS)) {
   throw new IllegalStateException("不能獲取多鎖");
}
System.out.println("已獲取多鎖");
System.out.println("是否有第一個鎖: " + lock1.isAcquiredInThisProcess());
System.out.println("是否有第二個鎖: " + lock2.isAcquiredInThisProcess());
try {
    // 資源操作
    resource.use();
} finally {
    System.out.println("釋放多個鎖");
    lock.release();
}
System.out.println("是否有第一個鎖: " + lock1.isAcquiredInThisProcess());
System.out.println("是否有第二個鎖: " + lock2.isAcquiredInThisProcess());
client.close();

Curator實現可重入鎖

當調用 InterProcessMutex#acquire方法獲取鎖的時候,會調用InterProcessMutex#internalLock方法。

// 獲取可重入互斥鎖,直到獲取成功為止
@Override
public void acquire() throws Exception {
  if (!internalLock(-1, null)) {
    throw new IOException("Lost connection while trying to acquire lock: " + basePath);
  }
}

internalLock 方法會先獲取當前請求鎖的線程,然後從 threadData( ConcurrentMap<Thread, LockData> 類型)中獲取當前線程對應的 lockData 。 lockData 包含鎖的信息和加鎖的次數,是實現可重入鎖的關鍵。

第一次獲取鎖的時候,lockData為 null。獲取鎖成功之後,會將當前線程和對應的 lockData 放到 threadData 中

private boolean internalLock(long time, TimeUnit unit) throws Exception {
  // 獲取當前請求鎖的線程
  Thread currentThread = Thread.currentThread();
  // 拿對應的 lockData
  LockData lockData = threadData.get(currentThread);
  // 第一次獲取鎖的話,lockData 為 null
  if (lockData != null) {
    // 當前線程獲取過一次鎖之後
    // 因為當前線程的鎖存在, lockCount 自增後返回,實現鎖重入.
    lockData.lockCount.incrementAndGet();
    return true;
  }
  // 嘗試獲取鎖
  String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
  if (lockPath != null) {
    LockData newLockData = new LockData(currentThread, lockPath);
     // 獲取鎖成功之後,將當前線程和對應的 lockData 放到 threadData 中
    threadData.put(currentThread, newLockData);
    return true;
  }

  return false;
}

LockData是 InterProcessMutex中的一個靜態內部類。

private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();

private static class LockData
{
    // 當前持有鎖的線程
    final Thread owningThread;
    // 鎖對應的子節點
    final String lockPath;
    // 加鎖的次數
    final AtomicInteger lockCount = new AtomicInteger(1);

    private LockData(Thread owningThread, String lockPath)
    {
      this.owningThread = owningThread;
      this.lockPath = lockPath;
    }
}

如果已經獲取過一次鎖,後面再來獲取鎖的話,直接就會在 if (lockData != null) 這裏被攔下了,然後就會執行lockData.lockCount.incrementAndGet(); 將加鎖次數加 1。

整個可重入鎖的實現邏輯非常簡單,直接在客户端判斷當前線程有沒有獲取鎖,有的話直接將加鎖次數加 1 就可以了。

案例-模擬12306售票

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.TimeUnit;

public class Ticket12306 implements Runnable{

    private int tickets = 10;//數據庫的票數

    private InterProcessMutex lock ;

    public Ticket12306(){
        //重試策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        //2.第二種方式
        //CuratorFrameworkFactory.builder();
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("192.168.149.135:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy)
                .build();

        //開啓連接
        client.start();

        lock = new InterProcessMutex(client,"/lock");
    }

    @Override
    public void run() {
        while(true){
            //獲取鎖
            try {
                lock.acquire(3, TimeUnit.SECONDS);
                if(tickets > 0){
                    System.out.println(Thread.currentThread()+":"+tickets);
                    Thread.sleep(100);
                    tickets--;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                //釋放鎖
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

測試方法:

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class LockTest {

    public static void main(String[] args) {
        Ticket12306 ticket12306 = new Ticket12306();

        //創建客户端
        Thread t1 = new Thread(ticket12306,"去哪兒");
        Thread t2 = new Thread(ticket12306,"飛豬");

        t1.start();
        t2.start();
    }
}

優缺點

優點:

  1. 可靠性:ZooKeeper 是一個高可用的分佈式協調服務,基於它的分佈式鎖具有較高的可靠性和穩定性。
  2. 順序性: ZooKeeper 的有序臨時節點保證了鎖的獲取順序,避免了死鎖和競爭問題。
  3. 避免死鎖:在鎖的持有者釋放鎖之前,其他節點無法獲取鎖,從而避免了死鎖問題。
  4. 容錯性:即使部分節點發生故障,其他節點仍然可以正常獲取鎖,保證了系統的穩定性。

缺點:

  1. 性能:ZooKeeper 是一箇中心化的協調服務,可能在高併發場景下成為性能瓶頸。
  2. 複雜性:ZooKeeper 的部署和維護相對複雜,需要一定的運維工作。
  3. 單點故障:儘管 ZooKeeper 本身是高可用的,但如果 ZooKeeper 集羣出現問題,可能會影響到基於它的分佈式鎖。

有序臨時節點的機制確保了獲取鎖的順序,避免了循環等待,從而有效地避免了死鎖問題。因為任何一個客户端在釋放鎖之前都會刪除自己的節點,從而觸發下一個等待的客户端獲取鎖。

需要注意的是,這種機制雖然能夠有效避免死鎖,但也可能帶來性能問題。當某個客户端釋放鎖時,需要觸發所有等待的客户端獲取鎖,可能會導致較多的網絡通信和監聽事件。因此,在高併發情況下,需要綜合考慮性能和鎖的可靠性。

總的來説,基於 ZooKeeper 的分佈式鎖能夠確保數據一致性和鎖的可靠性,但需要權衡性能和複雜性。在選擇時,需要根據具體場景來決定是否使用該種鎖機制。

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.