1. Curator 介紹
Apache Curator 是一個用於簡化 Apache ZooKeeper 使用的 Java 庫。它提供了一組高層次的 API 和實用工具,幫助開發者更輕鬆地與 ZooKeeper 集成並使用其功能。ZooKeeper 本身是一個強大的分佈式協調服務,但其原生 API 相對底層,並且在處理連接管理、重試機制和會話恢復等方面需要較多的手動處理。Curator 旨在解決這些問題,通過更易用的接口和高級抽象來提高開發效率和代碼的可維護性。
1.1. 類似 Redisson
Curator 之於 ZooKeeper,就像 Redisson 之於 Redis。 這裏使用 Redisson 而不是 Jedis,是因為 Jedis 提供的是比較原生的API,而 Redisson 封裝了更豐富的功能(如直接提供分佈式鎖、限流器等類)。
同樣 ZooKeeper 也有類似於 Jedis 的工具 - Apache ZooKeeper Java Client 。同樣也是 ZooKeeper 官方提供的 Java 客户端,提供基礎的 ZooKeeper 服務訪問和操作功能,相對而言比較原生的API。
1.2. 主要特性
1、連接管理
Curator 提供了靈活的連接管理,包括自動重試和複雜的重試策略,如指數退避(重試次數增加後,重試間隔時間指數級增加)。
它可以自動處理會話過期和重連,從而減少開發者的工作量。其內建的重試機制和會話恢復功能,提高了應用程序的可靠性。
2.節點及數據管理
Curator 提供 ZooKeeper 中原生的節點及數據管理,如:創建、刪除節點,更新數據等,但 watch 監聽進一步做了包裝。
3.Recipe 實現
Curator 實現了一些常見的分佈式系統模式(稱為 Recipes),這包括:領導選舉、分佈式鎖、共享計數器、分佈式隊列、分佈式屏障等。
這些模式經過精心設計和測試,可以直接用於生產環境中。
2. 連接管理
使用 Curator 的第一步就是要創建 ZooKeeper 客户端的連接,用法上都是先創建 CuratorFramework 對象,然後執行 start() 方法啓動客户端,初始化連接。
2.1. 基礎創建
示例代碼:
CuratorFramework client = CuratorFrameworkFactory.newClient(
"localhost:2181",
new RetryOneTime(1000)
);
client.start();
CuratorFrameworkFactory.newClient 是創建 Curator 客户端最簡單的方法,就兩個參數:
- 服務器連接:ZooKeeper 服務器連接字符串。
- 重試策略:這裏使用的是 RetryOneTime(1000),表示如果連接失敗,則在 1000 毫秒後重試一次。
2.2. 高階創建
高階地創建CuratorFramework客户端,就需要用到 CuratorFrameworkFactory.Builder 了。可以實現更多的定製化選項,如設置會話超時、連接超時、重試策略以及命名空間。
示例代碼:
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(100, 3 , 1000))
.namespace("myNamespace")
.authorization("digest", "user:password".getBytes())
.build();
client.start();
CuratorFrameworkFactory.builder() 創建了一個可定製的構建器。
.connectString("localhost:2181"): 設置了 ZooKeeper 服務器的連接字符串。.sessionTimeoutMs(5000): 設置了會話超時時間為 5000 毫秒。這意味着如果客户端超過 5 秒沒有與 ZooKeeper 服務器交互,那麼會話將過期。.connectionTimeoutMs(5000): 設置了連接超時時間為 5000 毫秒。如果客户端在 5 秒內無法成功連接到 ZooKeeper 服務器,則會拋出異常。.retryPolicy(new ExponentialBackoffRetry(100, 3 , 1000)): 設置了重試策略為指數退避策略。在這種情況下,首次重試間隔為 100 毫秒,最多重試 3 次,最大重試間隔不超過 1000 毫秒。.namespace("myNamespace"): 設置了命名空間為 myNamespace。這有助於避免不同應用之間的節點命名衝突。.authorization("digest", "user:password".getBytes()): 設置了客户端的安全認證信息。在這個例子中,使用了 Digest 認證方式,並指定了用户名和密碼。
2.3. 重連策略
高階使用的例子中有提到自動重試的策略 .retryPolicy(new ExponentialBackoffRetry(1000, 3)),且提到為指數退避策略。
自動重試很好理解,當因為網絡等原因導致連接中斷時,客户端會嘗試重新連接。那什麼是指數退避策略呢。
指數退避(Exponential Backoff)
是一種在網絡編程和分佈式系統中常用的重試策略,用於處理失敗的請求或連接。它的核心思想是在每次重試之間等待一個逐漸增加的時間間隔,以減少連續重試對系統造成的壓力。
工作原理:
- 初始重試間隔:指數退避策略通常從一個較小的初始時間間隔開始,例如 100 毫秒。
- 逐步增加重試間隔:每次重試失敗後,下一個重試的時間間隔會按照指數級增長。例如,第一次重試間隔為 100 毫秒,第二次為 200 毫秒,第三次為 400 毫秒,依此類推。
- 最大重試間隔:為了避免無限增長的時間間隔導致長時間等待,通常會設定一個最大重試間隔。例如,最大重試間隔可能設置為 1 分鐘。
- 隨機化:為了進一步分散請求,可以在每個重試間隔的基礎上添加一定的隨機偏移量,以避免多個客户端同時重試導致的“重試風暴”。
解釋 ExponentialBackoffRetry(100, 3, 1000)
ExponentialBackoffRetry(100, 3, 1000 創建一個指數退避策略實例,其中:
- 100: 初始重試間隔為 100 毫秒。
- 3: 最多重試 3 次。
- 1000: 最大重試間隔為 1000 毫秒(1 秒)。
指數退避的重試間隔計算方法通常是根據指數級增長的方式來增加時間間隔,每次重試間隔是按照前一次的基礎上乘以 2 來計算的,但它也會被限制在最大重試間隔之內。對於這個具體例子,重試間隔時間會是:
- 第一次重試:間隔是初始值 100 毫秒。
- 第二次重試:間隔是 100 * 2 = 200 毫秒。
- 第三次重試:間隔是 100 * 4 = 400 毫秒。
在這個例子中,雖然我們設定了最大間隔為 1000 毫秒,但由於最多隻重試 3 次,所以實際的間隔不會達到最大設定值 1000 毫秒。如果重試次數增加,那麼一旦增長的間隔時間超過 1000 毫秒,它將不會繼續增長,因為 1000 是設置的最大間隔限制。
2.4. 命名空間
命名空間是 Curator 客户端在邏輯上包裝的概念,ZooKeeper 並未提供。
當你在 Curator 中定義了一個命名空間後,Curator 會自動在該命名空間下對所有的 ZooKeeper 操作進行相對路徑處理。也就是説,你在 Curator 客户端中定義路徑時,無需考慮命名空間的前綴,Curator 會自動幫你在實際操作時加上。
例如上述 .namespace("myNamespace"),定義了 myNamespace 的命名空間,當我們基於該客户端連接創建一個 /node1 節點。
client.create().forPath("/node1", "data".getBytes());
這行代碼將在實際的 ZooKeeper 路徑 /myNamespace/node1 上創建一個節點,而不是 /node1。
應用場景:
- 隔離性:通過命名空間,能夠將不同的應用或模塊在 ZooKeeper 上的操作隔離開來,避免節點名稱衝突。
- 簡化管理:命名空間使得節點路徑管理更加簡單和直觀。在創建和管理節點時,開發者無需手動添加命名空間前綴,減少了出錯的可能性。
- 遷移和版本控制:在應用升級或遷移過程中,可以為新版本或測試環境創建單獨的命名空間,方便對比和驗證。通過命名空間,也可以輕鬆實現同一應用的不同版本共存。
3. 節點及數據管理
下面例子中假設已經定義好了 CuratorFramework 變量 client。
3.1. 基本操作
3.1.1. 創建
示例:
String path = client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT_SEQUENTIAL)
.forPath("/example/path", "initial_data".getBytes());
解釋:
- create(): 創建一個節點。如果節點已經存在則拋出異常。
- creatingParentsIfNeeded(): 如果父節點不存在,則會自動創建父節點。
- forPath:第一個參數為路徑,第一個參數為節點值。
-
withMode(): 設置節點類型(持久節點、臨時節點)。
- 持久節點 (CreateMode.PERSISTENT)
- 臨時節點 (CreateMode.EPHEMERAL)
- 持久有序節點 (CreateMode.PERSISTENT_SEQUENTIAL)
- 臨時有序節點 (CreateMode.EPHEMERAL_SEQUENTIAL)
3.1.2. 刪除
示例:
client.delete()
.guaranteed()
.deletingChildrenIfNeeded()
.forPath("/example/path");
解釋:
- delete(): 刪除一個節點。
- deletingChildrenIfNeeded(): 刪除節點以及其所有子節點。
- guaranteed(): 保證節點刪除,即使連接中斷後,也會在重新連接後繼續嘗試刪除節點。
3.1.3. 更新
示例:
client.setData()
.forPath("/example/path", "updated_data".getBytes());
解釋:
- setData(): 設置節點的數據。如果節點不存在將拋出異常。
3.1.4. 獲取節點值
示例:
byte[] data = client.getData().forPath("/example/path");
System.out.println("Node data: " + new String(data));
解釋:
- getData(): 獲取節點的數據。
3.1.5. 檢查節點是否存在
示例:
Stat stat = client.checkExists().forPath("/example/path");
if (stat != null) {
System.out.println("Node exists at: /example/path");
} else {
System.out.println("Node does not exist at: /example/path");
}
解釋:
- checkExists(): 檢查節點是否存在。
3.1.6. 獲取所有子節點
示例:
List<String> children = client.getChildren().forPath(parentPath);
解釋:
- getChildren().forPath(parentPath): 調用這個方法來獲取指定路徑下的子節點。返回的 List<String> 包含所有子節點的名稱,但不包含路徑。
注意
判斷一個節點是否有子節點,也是通過這個方法,然後通過 children 集合是否為空來判斷。
3.2. 版本管理
在 ZooKeeper 中,每個節點都有一個版本編號,用於記錄對該節點數據的修改次數。每次更新節點數據時,版本號會遞增。
在更新數據、刪除節點等操作上,使用版本號可以實現樂觀鎖,確保數據的一致性和正確性。
1、版本號的作用
- 數據一致性:確保只有在你知道節點當前版本的情況下才能更新數據。這防止了舊數據覆蓋新數據的情況。
- 樂觀鎖:通過比較和設置版本號來實現樂觀鎖控制。
2、使用版本號的操作
- 獲取節點數據和版本號:使用 client.getData().storingStatIn(stat).forPath(path); 可以同時獲取節點的數據和版本信息。
- 更新節點數據時指定版本號:使用 client.setData().withVersion(stat.getVersion()).forPath(path, "new_data".getBytes()); 指定版本號進行更新。
- 錯誤版本號的更新嘗試:如果指定的版本號不匹配,則拋出異常。這是檢測並避免數據衝突的一種方式。
3、示例代碼
以下是如何在 Curator 中使用 ZooKeeper 節點的版本號來確保數據更新一致性的示例。
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
public class CuratorVersioningExample {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(
"localhost:2181", new ExponentialBackoffRetry(1000, 3));
client.start();
String path = "/example/versioned_node";
// 創建一個持久節點
if (client.checkExists().forPath(path) == null) {
client.create().creatingParentsIfNeeded().forPath(path, "initial_data".getBytes());
}
// 獲取節點的數據和版本號
Stat stat = new Stat();
byte[] data = client.getData().storingStatIn(stat).forPath(path);
System.out.println("Current data: " + new String(data));
System.out.println("Current version: " + stat.getVersion());
// 嘗試更新節點數據(使用正確的版本號進行更新)
try {
client.setData().withVersion(stat.getVersion()).forPath(path, "new_data".getBytes());
System.out.println("Data updated successfully with correct version.");
} catch (Exception e) {
System.out.println("Failed to update data: " + e.getMessage());
}
// 獲取更新後的數據和版本號
data = client.getData().storingStatIn(stat).forPath(path);
System.out.println("Updated data: " + new String(data));
System.out.println("Updated version: " + stat.getVersion());
// 嘗試使用錯誤的版本號更新節點數據
try {
client.setData().withVersion(stat.getVersion() - 1).forPath(path, "incorrect_version_data".getBytes());
System.out.println("Data updated with incorrect version (unexpected).");
} catch (Exception e) {
System.out.println("Failed to update data with incorrect version: " + e.getMessage());
}
client.close();
}
}
4、withVersion
.withVersion() 方法在 Apache Curator 中主要用於兩類操作:節點數據的更新和節點的刪除。這兩個操作都是與 ZooKeeper 數據節點的版本號直接相關的操作,利用版本號可以實現樂觀鎖機制,確保操作的原子性和數據的一致性。
節點數據更新 (setData):
client.setData().withVersion(version).forPath(path, newData);
節點刪除 (delete):
client.delete().withVersion(version).forPath(path);
4. Recipe 實現
Apache Curator 是一個用於簡化 Apache ZooKeeper 使用的高層次客户端庫。它不僅提供了 ZooKeeper 原生 API 的封裝,還提供了一系列稱為 "Recipes" 的高級工具,這些工具實現了常見的分佈式系統的設計模式。這些 Recipes 大大簡化了使用 ZooKeeper 進行分佈式協調的複雜性。
一些常用的 Curator Recipes 如:Leader Election(領導選舉)、Shared Reentrant Lock(可重入鎖)、Barrier(屏障)、Cache(緩存)、DistributedQueue(分佈式隊列)等。
Cache 即 NodeCache等緩存,單起一章講。
4.1. Leader Election(領導選舉)
領導者選舉在分佈式系統中非常重要,因為它允許多個進程協調並選擇一個進程作為“主”進程來執行特定任務。
1、示例
代碼示例:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorLeaderElectionExample {
public static void main(String[] args) throws Exception {
String zkConnectionString = "localhost:2181";
String leaderPath = "/example/leader";
// 創建並啓動 Curator 客户端
CuratorFramework client = CuratorFrameworkFactory.newClient(
zkConnectionString,
new ExponentialBackoffRetry(1000, 3)
);
client.start();
// 創建 LeaderSelector 實例
LeaderSelector leaderSelector = new LeaderSelector(client, leaderPath, new LeaderSelectorListenerAdapter() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
System.out.println("I am the leader now!");
// 在這裏執行主節點相關的任務
Thread.sleep(3000); // 模擬領導者任務執行
System.out.println("Releasing leadership");
}
});
// 自動重新排隊,確保在領導權釋放後能夠重新競選
leaderSelector.autoRequeue();
leaderSelector.start();
// 讓主線程保持運行狀態以觀察選舉過程
Thread.sleep(Long.MAX_VALUE);
// 省略 close 代碼
}
}
代碼解釋:
- LeaderSelector: 用於執行領導者選舉的核心類。LeaderSelectorListenerAdapter 提供了 takeLeadership 方法,當節點成為領導者時調用。
- autoRequeue: 確保噹噹前領導者釋放領導權後,該實例可以重新參與選舉。
- takeLeadership: 這是當節點成為領導者時執行的邏輯。在這裏,我們模擬執行了一些任務,然後釋放領導權。
2、實現原理
ZooKeeper 的領導者選舉通常是基於臨時順序節點(ephemeral sequential nodes)實現的。以下是基本的流程和原理:
- 創建臨時順序節點:每個參與選舉的客户端都會在一個特定的“選舉”路徑下創建一個臨時順序節點。
- 獲取子節點列表並排序:每個客户端獲取這個路徑下的所有子節點,並按順序排列。
- 判斷自己是否為序號最小的節點:如果某個客户端創建的節點是序號最小的節點,則它成為領導者。
- 監聽前一個節點:如果不是最小的節點,則監聽比自己小的那個節點(前一個節點)。如果前一個節點消失(意味着持有該節點的客户端故障或斷開連接),那麼當前節點檢查自己是否成為了新的最小節點。
- 重複過程:由於創建的是臨時節點,客户端斷開連接時節點會自動刪除,所以當領導者斷開連接時,會觸發新的選舉。
這種機制可以保證在分佈式系統中選出一個唯一的領導者,並且在領導者失效時能夠迅速選出新的領導者。Curator 對這一原理進行了封裝,使得實現領導者選舉更加簡潔和易於使用。
4.2. Shared Reentrant Lock(可重入鎖)
使用 ZooKeeper 實現分佈式鎖是一個常見的用例,而 Apache Curator 提供了 InterProcessMutex 來實現共享的可重入鎖。這種鎖機制確保了在分佈式系統中,多個客户端可以安全地訪問共享資源。
1、示例
代碼示例:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorSharedReentrantLockExample {
public static void main(String[] args) throws Exception {
String zkConnectionString = "localhost:2181";
String lockPath = "/example/lock";
// 創建並啓動 Curator 客户端
CuratorFramework client = CuratorFrameworkFactory.newClient(
zkConnectionString,
new ExponentialBackoffRetry(1000, 3)
);
client.start();
// 創建一個 InterProcessMutex 實例
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
try {
// 嘗試獲取鎖
if (lock.acquire(10, java.util.concurrent.TimeUnit.SECONDS)) {
try {
System.out.println("Lock acquired, performing some work...");
// 在這裏進行鎖定區間內的工作
Thread.sleep(5000);
} finally {
// 確保釋放鎖
lock.release();
System.out.println("Lock released");
}
} else {
System.out.println("Failed to acquire lock");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
client.close();
}
}
}
代碼解釋:
- InterProcessMutex: 這是 Curator 提供的可重入鎖實現。它確保在分佈式系統中多個客户端能夠安全地獲取和釋放鎖。
- acquire: 嘗試獲取鎖。在這個示例中,我們設置了一個超時時間(10秒)。
- release: 確保在完成工作後釋放鎖。
2、實現原理
ZooKeeper 的分佈式鎖通常基於臨時順序節點實現,其原理如下:
- 創建臨時順序節點: 每個想要獲取鎖的客户端在一個特定的鎖路徑下創建一個臨時順序節點。
- 獲取子節點列表並排序: 每個客户端獲取這個路徑下的所有子節點,並按順序排列。
- 判斷自己是否為序號最小的節點: 如果某個客户端創建的節點是最小的節點,則它獲得鎖。
- 監聽前一個節點: 如果不是最小的節點,則監聽比自己小的那個節點(前一個節點)。如果前一個節點消失(意味着持有該節點的客户端釋放了鎖),那麼當前節點檢查自己是否成為了新的最小節點,從而獲得鎖。
- 重複過程: 客户端斷開連接時,臨時節點會自動刪除,從而觸發鎖的重新分配。
Curator 的 InterProcessMutex 對這一過程進行了封裝,自動處理節點的創建、排序、監聽和刪除,使得開發者不必手動實現這些細節。這種機制確保了在分佈式系統中,鎖可以安全、可靠地被多個進程共享和控制。
4.3. Barrier(屏障)
在分佈式計算中,Barrier 是一種同步原語,用於確保一組參與進程在執行某個操作之前必須等待所有其他進程都到達 Barrier。這類似於一個起跑線上的賽跑者,必須等到所有人都準備好之後才能一起出發。
1、示例
代碼示例:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorDistributedBarrierExample {
public static void main(String[] args) throws Exception {
String zkConnectionString = "localhost:2181";
String barrierPath = "/example/barrier";
// 創建並啓動 Curator 客户端
CuratorFramework client = CuratorFrameworkFactory.newClient(
zkConnectionString,
new ExponentialBackoffRetry(1000, 3)
);
client.start();
// 創建一個 DistributedBarrier 實例
DistributedBarrier barrier = new DistributedBarrier(client, barrierPath);
try {
System.out.println("Waiting at the barrier...");
// 等待所有參與者到達 Barrier
barrier.waitAtBarrier(10, java.util.concurrent.TimeUnit.SECONDS);
System.out.println("All participants have reached the barrier. Proceeding...");
// 在這裏進行需要同步的操作
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
} finally {
client.close();
}
}
}
代碼解釋:
- CuratorFramework: 創建並啓動一個 Curator 客户端以連接到 ZooKeeper。
- DistributedBarrier: 這是 Curator 提供的 Barrier 實現。它確保在分佈式系統中多個客户端可以安全地等待所有參與者到達 Barrier。
- waitAtBarrier: 嘗試等待所有參與者到達 Barrier。在這個示例中,我們設置了一個超時時間(10秒)。
- finally: 確保在完成操作後關閉 Curator 客户端。
2、實現原理
假設我們有兩個參與者 A 和 B,它們需要同時執行某個操作:
- 參與者 A 到達:A 在 /example/barrier 下創建一個臨時節點。
- 參與者 B 到達:B 在 /example/barrier 下也創建一個臨時節點。
- 兩個參與者都到達:此時,屏障節點 /example/barrier 已經有兩個子節點,代表兩個參與者都已到達。
- 刪除屏障節點:最後一個到達的參與者(假設是 B)刪除屏障節點 /example/barrier。
- 觸發監聽器:屏障節點的刪除觸發所有參與者的監聽器,告知它們可以繼續執行下一步。
通過這種方式,ZooKeeper 的 Barrier 實現確保了所有參與者都可以同步地執行下一步操作,這對於分佈式系統中的同步操作非常有用。
4.4. DistributedQueue(分佈式隊列)
1、示例
Curator 提供了一些高級原語來簡化在 ZooKeeper 上實現分佈式隊列的過程,例如 DistributedQueue。
代碼示例:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
public class CuratorDistributedQueueExample {
public static void main(String[] args) throws Exception {
String zkConnectionString = "localhost:2181";
String queuePath = "/example/queue";
// 創建並啓動 Curator 客户端
CuratorFramework client = CuratorFrameworkFactory.newClient(
zkConnectionString,
new ExponentialBackoffRetry(1000, 3)
);
client.start();
// 創建一個 DistributedQueue 實例
DistributedQueue<String> queue = QueueBuilder.builder(
client,
null, // 這裏可以設置一個消費者
(message) -> { System.out.println("Consumed: " + message); }, // 消費者邏輯
queuePath
).buildQueue();
queue.start();
// 生產者 - 向隊列中添加消息
queue.put("Message 1");
queue.put("Message 2");
// 這裏可以模擬消費者進行消費
// 優雅關閉隊列和客户端
Thread.sleep(10000);
queue.close();
client.close();
}
}
代碼解釋:
- DistributedQueue: 使用 QueueBuilder 創建一個 DistributedQueue 實例。可以指定一個消費者邏輯來處理隊列中的消息。
- put: 向隊列中添加消息,模擬生產者的行為。
- 消費者邏輯: 在 QueueBuilder 中設置消費者邏輯,處理從隊列中取出的消息。
- queue.close() 和 client.close(): 確保在操作完成後關閉隊列和 Curator 客户端。
2、實現原理
在 ZooKeeper 中,實現一個分佈式隊列的原理通常基於有序節點和監聽機制:
- 有序節點: 利用 ZooKeeper 的有序節點特性,每個加入隊列的元素在一個指定路徑下創建一個有序節點,這樣可以自動確保元素的順序。
- 生產者: 通過在指定路徑下創建有序子節點來添加元素。例如,生產者在 /queue 路徑下依次創建 /queue/element-0000000001、/queue/element-0000000002 等節點。
- 消費者: 監聽 /queue 路徑的子節點變化,獲取所有子節點列表並排序,取出序號最小的節點進行消費。
- 消費後刪除節點: 消費者在處理完節點中的數據後,刪除該節點以從隊列中移除該任務。
5. 節點監聽
Curator 提供了多個工具用於 ZooKeeper 節點監聽(watch),常見的有:NodeCache、PathChildrenCache 和 TreeCache,這裏主要講解 NodeCache。
NodeCache 是 Apache Curator 提供的一種實用工具,用於監聽 ZooKeeper 中某個特定節點的數據變化。它在客户端中維護了該節點的一個本地緩存,當節點的數據發生變化時,NodeCache 會自動更新緩存,並通知註冊的監聽器。這種機制非常適合需要實時監控節點數據變化的場景。
5.1. NodeCache 使用示例
NodeCache 的使用步驟
- 創建 CuratorFramework 實例並啓動。
- 創建 NodeCache 實例。
- 註冊監聽器以響應節點數據變化。
- 啓動 NodeCache。
- 處理完後關閉 NodeCache 和 CuratorFramework。
代碼示例:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class NodeCacheExample {
private static final String ZK_ADDRESS = "localhost:2181"; // ZooKeeper 連接地址
private static final String ZNODE_PATH = "/example/nodeCache"; // 監聽的節點路徑
public static void main(String[] args) throws Exception {
// 1. 創建 CuratorFramework 實例
CuratorFramework client = CuratorFrameworkFactory.newClient(
ZK_ADDRESS,
new ExponentialBackoffRetry(1000, 3)
);
// 2. 啓動 CuratorFramework 客户端
client.start();
// 3. 創建 NodeCache 實例
NodeCache nodeCache = new NodeCache(client, ZNODE_PATH);
// 4. 註冊監聽器
NodeCacheListener listener = () -> {
byte[] newData = nodeCache.getCurrentData().getData();
System.out.println("Node data updated, new data: " + new String(newData));
};
nodeCache.getListenable().addListener(listener);
// 5. 啓動 NodeCache
nodeCache.start(true);
// 模擬等待
System.out.println("Listening for data changes on node: " + ZNODE_PATH);
Thread.sleep(100000);
// 6. 關閉 NodeCache 和 CuratorFramework 客户端
nodeCache.close();
client.close();
}
}
當 NodeCache 啓動後,本地 NodeCache 中數據會自動更新變化。當如果我們需要自定義邏輯,當數據變化時處理對應邏輯,則需要註冊NodeCacheListener。其中 nodeCache.getCurrentData().getData() 為更新後的數據,如果值為 null,可能意味着當前節點已被刪除。
5.2. start、close
1、start(boolean buildInitialCache)
start(true):當參數為 true 時,NodeCache 會在啓動時立即嘗試從 ZooKeeper 獲取指定節點的初始數據,並將其存儲在本地緩存中。這意味着一旦 NodeCache 啓動,監聽器將立即收到節點的初始數據更新事件。start(false):當參數為 false 時,NodeCache 不會在啓動時立即嘗試獲取節點的初始數據。它將僅在節點數據實際發生變化時才更新本地緩存,並觸發監聽器。這意味着如果在 NodeCache 啓動後節點數據尚未變化,則監聽器不會接收到任何更新事件,直到節點數據真正發生變化。
2、start 方法步驟
start() 方法執行了幾個重要操作,以便開始監聽指定的節點路徑,並維護其數據的本地緩存:
- 初始化數據:在啓動時,NodeCache 會嘗試從 ZooKeeper 中獲取指定節點的初始數據。如果節點存在且可訪問,它將緩存該數據。
- 註冊監聽器:NodeCache 會在 ZooKeeper 上註冊一個監聽器,用於監聽該節點的變化事件,包括節點數據的更新、節點的創建和刪除。
- 啓動背景線程:NodeCache 會啓動必要的後台線程,以處理來自 ZooKeeper 的事件並更新緩存。
3、close 方法步驟
close() 方法用於釋放 NodeCache 實例所佔用的資源,並停止其所有後台活動和監聽器。這是一個關鍵的方法,確保在不再需要使用 NodeCache 時,正確地關閉並清理資源。具體來説,close() 方法的作用包括:
- 停止監聽器:close() 方法會停止 NodeCache 中註冊的所有監聽器,不再接收來自 ZooKeeper 的事件更新。這意味着任何節點數據的變化都將不再通知給應用程序。
- 釋放資源:NodeCache 使用了一些資源,包括線程、網絡連接和內存等。close() 方法確保這些資源被釋放,以避免資源泄漏。
- 確保線程安全:關閉 NodeCache 可以確保在應用程序結束或不再需要節點監控時,系統資源被適當地釋放,防止後台線程繼續運行或進入不確定的狀態。
4、不能只關閉 CuratorFramework 而不關閉 NodeCache
在結束對 NodeCache 的使用或 curatorFramework.close() 之前,確保調用 nodeCache.close() 來釋放資源和停止後台線程。
否則會導致的問題:
- 資源泄漏:如果你不手動關閉 NodeCache 而僅僅關閉 CuratorFramework 客户端,可能會造成資源泄漏。雖然 CuratorFramework 的關閉會終止與 ZooKeeper 的連接,但 NodeCache 本身的資源(如線程或其他結構)可能沒有被適當釋放。
- 異常行為:由於 NodeCache 依賴於 CuratorFramework 的連接來接收事件和數據更新,關閉 CuratorFramework 後,NodeCache 可能會嘗試繼續操作而沒有有效的連接,這可能導致異常或者不可預測的行為。
- 不必要的線程活動:如果 NodeCache 的內部線程未關閉,它們可能會繼續運行,浪費系統資源。
5.3. 實現原理
ZooKeeper 的原生 watch 機制是一次性的。這意味着當一個事件觸發 watch 後,watch 就會被移除,必須重新註冊以繼續監聽後續事件。
但 Curator NodeCache 框架通過封裝和管理這些細節,實現了持久的節點監聽。以下是 NodeCache 持久監聽的核心原理:
- 自動重置 watch:當 NodeCache 接收到一個節點事件時,它會自動重新註冊 watch,以確保持續監聽節點的後續變化。這是通過在事件回調中重新設置 watch 實現的。
- 事件處理:NodeCache 使用 Curator 提供的方法來註冊對節點的監聽。Curator 內部會通過一個事件循環來處理來自 ZooKeeper 的事件。當節點的數據發生變化時,Curator 會接收到通知,並將事件分派給 NodeCache 的監聽器。
- 緩存管理:NodeCache 維護一個緩存來存儲節點的當前數據。當節點數據發生變化時,緩存會更新,並且任何註冊的監聽器都會被觸發來處理這個變化。
- 後台線程:NodeCache 使用一個後台線程來處理這些事件和緩存更新,因此應用程序的主線程不會被阻塞。
- 錯誤處理和重試機制:Curator 本身提供了重試機制來處理臨時的連接問題(例如網絡故障)。NodeCache 利用這種機制在遇到臨時問題時自動重試,以確保持續的連接和事件監聽。
5.4. 網絡中斷導致的問題
1、watch中斷(解決)
因為 ZooKeeper 的 watch 是一次性的,需要在每次事件觸發後重新設置。在使用 ZooKeeper 的原生 watch 機制時,如果在處理事件後還未重新註冊 watch 就發生網絡故障,可能會導致後續的事件無法監聽到。
然而,Curator 框架通過其高級封裝和管理,極大地緩解了這一問題:
- 自動重連:Curator 內部管理 ZooKeeper 客户端的會話和連接狀態。如果檢測到網絡故障或連接中斷,它會自動進行重連。
- 重新註冊:在重連期間,Curator 會嘗試重新建立所有必要的 watch。這包括在 NodeCache 中用於監聽節點變化的 watch。
2、數據一致性(解決)
如果客户端在網絡中斷期間, ZooKeeper 服務端數據發生變更,客户端是沒有監聽到變更消息的。在客户端重連後,是否會導致數據不一致?
不會的,如上述 Curator 在重連後會重建所有必要的 watch。我們可以在 CuratorFramework 註冊的重連監聽器中調用 NodeCache.getCurrentData() 主動更新值。或者最初 NodeCache.start(true) 參數設置為 true。
3、事件丟失(未解決)
上述在網絡中斷重連後,可以保障數據最終一致性。但在中斷期間watch 的數據變更事件是否會重新推送呢?不會,ZooKeeper 不像 Etcd,沒有事件回放功能。
如果業務上需要基於 NodeCacheListener 變更事件做處理,可能會有依賴。所以業務上需要合理設計,規避這種影響,或者是否可以在重連事件中增加處理。
5.5. PathChildrenCache
NodeCache 只用於監聽指定節點自身的變化,而 PathChildrenCache 是專門用於監聽指定節點的子節點變化,包括子節點的添加、刪除和數據更新。
示例代碼:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
public class PathChildrenCacheExample {
public static void main(String[] args) {
String zookeeperConnectionString = "localhost:2181";
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(zookeeperConnectionString)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
PathChildrenCache childrenCache = new PathChildrenCache(client, "/example/path", true);
PathChildrenCacheListener listener = new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("Child added: " + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("Child updated: " + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("Child removed: " + event.getData().getPath());
break;
default:
break;
}
}
};
childrenCache.getListenable().addListener(listener);
try {
childrenCache.start(PathChildrenCache.StartMode.NORMAL);
// 運行一段時間,等待事件發生
Thread.sleep(10000); // 示例中運行10秒
} catch (Exception e) {
e.printStackTrace();
} finally {
// 關閉資源
try {
childrenCache.close();
} catch (Exception e) {
e.printStackTrace();
}
client.close();
}
}
}
5.6. TreeCache
TreeCache 是一個更高級的工具,可以監聽一個節點及其所有子節點(遞歸)的變化。它結合了 NodeCache 和 PathChildrenCache 的功能,是一個強大的工具用於監控整個子樹的變化。
在 TreeCache 的例子中,監聽事件包括了節點的添加、更新和刪除,這些事件既可能來自於子節點,也可能來自於自身節點。事實上,TreeCache 是用來監聽整個樹結構的變化,包括目標節點及其所有子節點的變化。
設是對路徑 /example/path 進行監聽,示例中可以區分是自身節點還是子節點:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
public class TreeCacheExample {
public static void main(String[] args) {
String zookeeperConnectionString = "localhost:2181";
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(zookeeperConnectionString)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
TreeCache treeCache = new TreeCache(client, "/example/path");
TreeCacheListener listener = new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) {
String path = event.getData().getPath();
if ("/example/path".equals(path)) {
System.out.println("Event for the root node:");
} else {
System.out.println("Event for a child node:");
}
switch (event.getType()) {
case NODE_ADDED:
System.out.println("Node added: " + path);
break;
case NODE_UPDATED:
System.out.println("Node updated: " + path);
break;
case NODE_REMOVED:
System.out.println("Node removed: " + path);
break;
default:
break;
}
}
};
treeCache.getListenable().addListener(listener);
try {
treeCache.start();
// 運行一段時間,等待事件發生
Thread.sleep(10000); // 示例中運行10秒
} catch (Exception e) {
e.printStackTrace();
} finally {
// 關閉資源
try {
treeCache.close();
} catch (Exception e) {
e.printStackTrace();
}
client.close();
}
}
}