博客 / 詳情

返回

Zookeeper 原生API VS Apache Curator 監聽

1、Zookeeper API

1.1、描述

在 ZooKeeper 中,Watcher 是一次性的,不會自動重新註冊因此,如果你希望在特定事件(如節點數據變化)發生後繼續監聽其他事件(如節點刪除),你需要在每次事件觸發時重新註冊 Watcher

1.2、示例

首先,確保你在項目中添加了 Zookeeper 的依賴:

<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.6.3</version>
</dependency>

完整示例代碼

import org.apache.zookeeper.*;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;

public class ZooKeeperExample {

    private static ZooKeeper zk;
    private static final String ZK_SERVER = "localhost:2181";
    private static final int SESSION_TIMEOUT = 3000;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        // 創建 ZooKeeper 客户端
        zk = new ZooKeeper(ZK_SERVER, SESSION_TIMEOUT, new MyWatcher());

        // 創建節點
        zk.create("/example", "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new MyStringCallback(), null);

        // 檢查節點狀態並設置 Watcher
        zk.exists("/example", true, new MyStatCallback(), null);

        // 觸發節點變化事件
        zk.setData("/example", "newData".getBytes(), -1);

        // 刪除節點,驗證 Watcher 是否會監聽刪除事件
        Thread.sleep(2000);
        zk.delete("/example", -1);

        // 保持程序運行以觀察回調
        Thread.sleep(10000);

        // 關閉 ZooKeeper 客户端
        zk.close();
    }

    // Watcher 實現類
    static class MyWatcher implements Watcher {
        @Override
        public void process(WatchedEvent event) {
            System.out.println("Watcher received event: " + event);
            try {
                if (event.getType() == Event.EventType.NodeDataChanged || event.getType() == Event.EventType.NodeDeleted) {
                    // 重新設置 Watcher
                    zk.exists(event.getPath(), true);
                }
            } catch (KeeperException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    // StatCallback 實現類
    static class MyStatCallback implements StatCallback {
        @Override
        public void processResult(int rc, String path, Object ctx, Stat stat) {
            if (rc == 0) {
                System.out.println("Node exists: " + path + ", stat: " + stat);
            } else {
                System.out.println("Node does not exist: " + path);
            }
        }
    }

    // StringCallback 實現類
    static class MyStringCallback implements StringCallback {
        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            if (rc == 0) {
                System.out.println("Node created: " + name);
            } else {
                System.out.println("Node creation failed: " + KeeperException.Code.get(rc));
            }
        }
    }
}

1.3、結果驗證

  • 首次觸發 Watcher:當第一次修改節點數據時,Watcher 會觸發並打印事件信息。
  • 刪除節點:刪除節點時,Watcher 會再次觸發並打印事件信息,因為在 process 方法中重新註冊了 Watcher。

通過這個完整的示例代碼,你可以驗證 Watcher 是一次性的,並且需要在每次觸發後重新註冊以繼續監聽其他事件,包括節點刪除事件

2、Apache Curator

2.1、描述

Apache Curator 是一個用於簡化 ZooKeeper 客户端開發的 Java 庫。它提供了更高層次的 API 來處理 ZooKeeper 的常見任務,例如連接管理、會話管理、重試機制等。以下是一個完整的實例,演示瞭如何使用 Curator 庫來進行回調和監聽節點事件

2.2、示例

確保在你的項目中添加了 Curator 的依賴:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>5.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.2.0</version>
</dependency>

完整示例代碼

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class TreeCacheExample {

    private static final String ZK_SERVER = "localhost:2181";
    private static final String ZNODE_PATH = "/example";

    public static void main(String[] args) throws Exception {
        // 創建 Curator 客户端
        CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_SERVER, new ExponentialBackoffRetry(1000, 3));
        client.start();

        client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
            @Override
            public void stateChanged(CuratorFramework curatorFramework, ConnectionState newState) {
                switch (newState) {
                    case CONNECTED:
                        System.out.println("Registry connected");
                        break;
                    case LOST:
                        System.out.println("Registry disconnected");
                        break;
                    case RECONNECTED:
                        System.out.println("Registry reconnected");
                        break;
                    case SUSPENDED:
                        System.out.println("Registry suspended");
                        break;
                    default:
                        break;
                }
            }
        });

        // 創建 TreeCache
        TreeCache treeCache = new TreeCache(client, ZNODE_PATH);

        // 添加 TreeCache 監聽器
        TreeCacheListener listener = new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                switch (event.getType()) {
                    case NODE_ADDED:
                        System.out.println("Node added: " + event.getData().getPath());
                        break;
                    case NODE_UPDATED:
                        System.out.println("Node updated: " + event.getData().getPath());
                        break;
                    case NODE_REMOVED:
                        System.out.println("Node removed: " + event.getData().getPath());
                        break;
                    default:
                        break;
                }
            }
        };
        treeCache.getListenable().addListener(listener);

        // 啓動 TreeCache
        treeCache.start();

        // 進行一些節點操作以觸發事件
        client.create().forPath(ZNODE_PATH + "/child", "data".getBytes());
        client.setData().forPath(ZNODE_PATH + "/child", "newData".getBytes());
        client.delete().forPath(ZNODE_PATH + "/child");

        // 保持程序運行以觀察回調
        Thread.sleep(10000);

        // 關閉 TreeCache 和 Curator 客户端
        treeCache.close();
        client.close();
    }
}

2.3、結果驗證

  • ConnectionStateListener 監聽Zookeeper的鏈接狀態
  • TreeCacheListener 監聽Zookeeper 路徑的事件

3、對比

TreeCache 和 Watcher 是 Apache Curator 框架和 Apache ZooKeeper 中的兩種不同的監聽機制,用於監視 ZooKeeper 集羣中的節點變化。它們的註冊和使用方式有所不同,原因主要在於它們的設計目的和工作機制

Watcher

  • 作用: Watcher 是 ZooKeeper 提供的一種機制,用於監視節點的變化。每次客户端對 ZooKeeper 進行讀取操作時,可以附帶一個 Watcher,當該節點發生變化時,ZooKeeper 會通知客户端。
  • 多次註冊: Watcher 是一次性的,即每次觸發後就失效了。如果客户端需要繼續監聽該節點的變化,則需要重新註冊 Watcher。因此,Watcher 的設計是為了輕量級、精細粒度的監聽。

TreeCache

  • 作用: TreeCache 是 Apache Curator 提供的一個高級緩存機制,用於監視一個節點及其子節點的變化。它不僅僅是監視單個節點,而是監視整個子樹的變化,並將數據緩存在本地。
  • 一次註冊: TreeCache 通過一次註冊,可以持續監視整個子樹的變化,並保持緩存的更新。這是因為 TreeCache 維護了一個本地緩存,能夠自動處理 ZooKeeper 的事件通知,保持緩存數據的一致性。它通過內部的機制,自動重新註冊 Watcher,以保證對子樹的變化持續監控。因此,用户不需要手動多次註冊

為什麼 TreeCache 不像 Watcher 一樣多次註冊?

1.    設計目的: TreeCache 的設計目的是為了方便地監視和緩存整個子樹的變化,提供一個高效、易用的接口來處理複雜的節點監控需求。而 Watcher 是為了提供一個更底層的、精細粒度的節點監控機制,適合一些簡單、輕量級的使用場景。
2.    內部實現: TreeCache 內部會自動管理 Watcher 的註冊和事件處理,因此用户不需要手動多次註冊 Watcher。而 Watcher 需要用户手動管理和重新註冊,以確保對節點的持續監控。
3.    使用場景: 對於需要監控多個節點或整個子樹變化的場景,TreeCache 提供了更高效和便捷的解決方案,而不需要用户頻繁註冊 Watcher。

總結來説,TreeCache 和 Watcher 是為了滿足不同需求而設計的。TreeCache 通過一次註冊,自動管理對節點及子節點的持續監控和緩存更新,而 Watcher 需要用户手動多次註冊以實現持續監控

user avatar yadong_zhang 頭像 buxiyan 頭像 shadowck 頭像 willliaowh 頭像 beiyinglunkuo 頭像 tina_tang 頭像
6 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.