博客 / 詳情

返回

ZAB 與 Paxos:分佈式一致性算法的工程實踐與深度對比

本文基於 Java 11+實現

構建可靠的分佈式系統時,一致性問題是核心挑戰之一。ZooKeeper 的 ZAB 協議和 Paxos 算法作為兩種主流解決方案,在理論基礎和工程實現上各有特點。本文深入分析它們的實現機制、性能特性和最佳實踐。

一、基本概念

ZAB 協議

ZAB (ZooKeeper Atomic Broadcast) 是專為 ZooKeeper 設計的分佈式一致性協議,核心目標是保證分佈式系統中數據更新的原子性和順序一致性。

Paxos 算法

Paxos 是 Leslie Lamport 提出的通用分佈式一致性算法,是眾多分佈式系統的理論基礎,解決的是在不可靠網絡中如何達成共識的問題。

二、ZAB 協議實現

ZAB 協議工作在兩種模式下:

  1. 恢復模式:系統啓動或 Leader 崩潰時觸發
  2. 廣播模式:正常運行時處理寫請求

核心接口定義

public interface ZabProcessor {
    // 恢復模式接口
    boolean startRecovery() throws RecoveryException;

    // 廣播模式接口
    CompletableFuture<Boolean> processWrite(Request request);
    CompletableFuture<Result> processRead(String key, ConsistencyLevel level);

    // 狀態查詢接口
    boolean isLeader();
    long getCurrentEpoch();
}

public interface NetworkClient {
    // 基礎網絡通信接口
    void connect(String serverId, String address, int port) throws IOException;
    void disconnect(String serverId);

    // ZAB協議消息
    ACK sendProposal(String serverId, ProposalPacket proposal) throws IOException;
    void sendCommit(String serverId, CommitPacket commit) throws IOException;
    LastZxidResponse sendEpochRequest(String serverId, EpochPacket epochPkt) throws IOException;
    boolean sendTruncate(String serverId, TruncatePacket truncPkt) throws IOException;
    boolean sendTransactions(String serverId, List<Transaction> txns) throws IOException;
    boolean sendNewLeader(String serverId, NewLeaderPacket newLeaderPkt) throws IOException;
    void sendHeartbeat(String serverId, long zxid) throws IOException;
    void sendSnapshot(String serverId, byte[] snapshot, long zxid) throws IOException;
}

public interface StateMachine {
    void apply(long zxid, byte[] command) throws Exception;
    long getLastAppliedZxid();
    byte[] takeSnapshot() throws Exception;
    void restoreSnapshot(byte[] snapshot, long zxid) throws Exception;
}

ZAB 恢復模式實現

public class ZABRecovery {
    private final AtomicLong zxid = new AtomicLong(0);
    private final AtomicInteger epoch = new AtomicInteger(0);
    private volatile ServerState state = ServerState.LOOKING;
    private final Logger logger = LoggerFactory.getLogger(ZABRecovery.class);
    private final ConcurrentMap<String, ServerData> serverDataMap;
    private final int quorumSize;
    private final NetworkClient networkClient;
    private final StateMachine stateMachine;
    private final String serverId;

    // 構造函數
    public ZABRecovery(String serverId, int quorumSize, NetworkClient networkClient,
                      StateMachine stateMachine) {
        this.serverId = serverId;
        this.quorumSize = quorumSize;
        this.networkClient = networkClient;
        this.stateMachine = stateMachine;
        this.serverDataMap = new ConcurrentHashMap<>();
    }

    // Leader恢復流程
    public boolean startRecovery() throws RecoveryException {
        MDC.put("component", "zab-recovery");
        MDC.put("serverId", serverId);
        try {
            // 1. 更新選舉輪次
            int newEpoch = epoch.incrementAndGet();
            logger.info("Starting recovery with epoch: {}", newEpoch);

            // 2. 發現階段:收集所有Follower狀態
            Map<Long, Set<String>> commitMap = discoverFollowerStates();

            // 3. 確定截斷點和提交點
            long truncateZxid = determineMaxCommittedZxid(commitMap);
            logger.info("Determined truncate zxid: {}", Long.toHexString(truncateZxid));

            // 4. 解決可能的衝突(腦裂後)
            resolveConflictsAfterPartition(truncateZxid, commitMap);

            // 5. 同步階段:將歷史事務同步給Follower
            syncFollowers(truncateZxid);

            // 6. 切換到廣播模式
            state = ServerState.LEADING;
            logger.info("Recovery completed, switching to broadcast mode");
            return true;
        } catch (IOException e) {
            logger.error("Recovery failed due to I/O error", e);
            state = ServerState.LOOKING;
            throw new RecoveryException("I/O error during recovery", e);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.warn("Recovery interrupted", e);
            state = ServerState.LOOKING;
            throw new RecoveryException("Recovery process interrupted", e);
        } catch (Exception e) {
            logger.error("Unexpected error during recovery", e);
            state = ServerState.LOOKING;
            throw new RecoveryException("Unexpected error during recovery", e);
        } finally {
            MDC.remove("component");
            MDC.remove("serverId");
        }
    }

    // 發現階段:收集所有Follower的最新事務信息
    private Map<Long, Set<String>> discoverFollowerStates() throws IOException, InterruptedException {
        Map<Long, Set<String>> acceptedZxids = new ConcurrentHashMap<>();
        CountDownLatch latch = new CountDownLatch(serverDataMap.size());
        List<CompletableFuture<?>> futures = new ArrayList<>();

        // 向所有Follower發送CEPOCH消息
        for (var entry : serverDataMap.entrySet()) {
            final String targetServerId = entry.getKey();
            final ServerData serverData = entry.getValue();

            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                MDC.put("targetServerId", targetServerId);
                try {
                    // 發送新的epoch
                    EpochPacket epochPkt = new EpochPacket(epoch.get());
                    LastZxidResponse response = networkClient.sendEpochRequest(
                        targetServerId, epochPkt);

                    // 記錄該服務器的最新zxid
                    synchronized (acceptedZxids) {
                        acceptedZxids.computeIfAbsent(response.getLastZxid(), k -> new HashSet<>())
                                    .add(targetServerId);
                    }

                    logger.info("Server {} last zxid: {}", targetServerId,
                               Long.toHexString(response.getLastZxid()));
                } catch (IOException e) {
                    logger.error("Failed to discover state from server: {}", targetServerId, e);
                } finally {
                    MDC.remove("targetServerId");
                    latch.countDown();
                }
            });

            futures.add(future);
        }

        // 等待大多數響應或超時
        if (!latch.await(10, TimeUnit.SECONDS)) {
            logger.warn("Discovery phase timed out, proceeding with available responses");
        }

        // 取消未完成的任務
        for (CompletableFuture<?> future : futures) {
            if (!future.isDone()) {
                future.cancel(true);
            }
        }

        return acceptedZxids;
    }

    // 確定需要保留的最大已提交事務ID
    private long determineMaxCommittedZxid(Map<Long, Set<String>> commitMap) {
        // 尋找被多數派確認的最大ZXID
        long maxZxid = 0;
        int quorum = getQuorum();

        for (var entry : commitMap.entrySet()) {
            if (entry.getValue().size() >= quorum && entry.getKey() > maxZxid) {
                maxZxid = entry.getKey();
            }
        }
        return maxZxid;
    }

    // 解決網絡分區後可能的數據衝突
    private void resolveConflictsAfterPartition(long truncateZxid,
                                             Map<Long, Set<String>> commitMap) {
        logger.info("Checking for potential conflicts after network partition");

        // 1. 識別潛在衝突事務 - 那些不在多數派中的更高zxid
        List<ConflictingTransaction> conflicts = new ArrayList<>();

        for (var entry : commitMap.entrySet()) {
            long txnZxid = entry.getKey();
            Set<String> servers = entry.getValue();

            // 如果zxid大於已確定的截斷點,但不是多數派確認的
            if (txnZxid > truncateZxid && servers.size() < getQuorum()) {
                // 獲取事務的epoch
                int txnEpoch = ZxidUtils.getEpochFromZxid(txnZxid);
                int truncateEpoch = ZxidUtils.getEpochFromZxid(truncateZxid);

                conflicts.add(new ConflictingTransaction(txnZxid, truncateZxid,
                                                      txnEpoch, truncateEpoch,
                                                      servers));
            }
        }

        // 2. 處理衝突
        if (!conflicts.isEmpty()) {
            logger.warn("Found {} potential conflicting transactions after partition",
                       conflicts.size());

            for (ConflictingTransaction conflict : conflicts) {
                if (conflict.isFromHigherEpoch()) {
                    logger.warn("Conflict: transaction with zxid {} from higher epoch {} " +
                              "found but not in majority. Will be discarded.",
                              Long.toHexString(conflict.getConflictZxid()),
                              conflict.getConflictEpoch());
                } else {
                    logger.warn("Conflict: transaction with zxid {} from same epoch {} " +
                              "found but not in majority. Will be discarded.",
                              Long.toHexString(conflict.getConflictZxid()),
                              conflict.getConflictEpoch());
                }

                // 通知這些服務器截斷這些事務
                notifyServersToTruncate(conflict.getServers(), truncateZxid);
            }
        } else {
            logger.info("No conflicting transactions found");
        }
    }

    // 通知服務器截斷超出安全點的事務
    private void notifyServersToTruncate(Set<String> servers, long truncateZxid) {
        for (String serverId : servers) {
            CompletableFuture.runAsync(() -> {
                try {
                    TruncatePacket truncPkt = new TruncatePacket(truncateZxid);
                    boolean success = networkClient.sendTruncate(serverId, truncPkt);
                    if (success) {
                        logger.info("Successfully notified server {} to truncate to zxid {}",
                                   serverId, Long.toHexString(truncateZxid));
                    } else {
                        logger.warn("Failed to notify server {} to truncate", serverId);
                    }
                } catch (IOException e) {
                    logger.error("Error notifying server {} to truncate", serverId, e);
                }
            });
        }
    }

    // 同步階段:將歷史事務同步給Follower
    private void syncFollowers(long truncateZxid) throws IOException, InterruptedException {
        // 獲取從truncateZxid開始的所有事務
        List<Transaction> txns = loadTransactionsFromLog(truncateZxid);
        logger.info("Syncing {} transactions to followers", txns.size());

        // 並行同步給所有Follower
        CountDownLatch syncLatch = new CountDownLatch(serverDataMap.size());
        AtomicInteger successCount = new AtomicInteger(0);
        List<CompletableFuture<?>> futures = new ArrayList<>();

        for (var entry : serverDataMap.entrySet()) {
            final String targetServerId = entry.getKey();
            final ServerData serverData = entry.getValue();

            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                MDC.put("targetServerId", targetServerId);
                try {
                    // 檢查Follower是否需要使用快照追趕
                    long followerZxid = serverData.getLastZxid();
                    if (truncateZxid - followerZxid > SNAPSHOT_THRESHOLD) {
                        syncFollowerWithSnapshot(targetServerId, followerZxid);
                    } else {
                        // 1. 發送TRUNC命令,通知Follower截斷日誌
                        TruncatePacket truncPkt = new TruncatePacket(truncateZxid);
                        if (networkClient.sendTruncate(targetServerId, truncPkt)) {
                            // 2. 發送DIFF命令,同步缺失的事務
                            if (networkClient.sendTransactions(targetServerId, txns)) {
                                // 3. 發送NEWLEADER命令,確認同步完成
                                NewLeaderPacket newLeaderPkt = new NewLeaderPacket(epoch.get());
                                if (networkClient.sendNewLeader(targetServerId, newLeaderPkt)) {
                                    // 同步成功
                                    successCount.incrementAndGet();
                                    logger.info("Successfully synced server: {}", targetServerId);
                                }
                            }
                        }
                    }
                } catch (IOException e) {
                    logger.error("Failed to sync server {} with {} transactions, last zxid: {}",
                                targetServerId, txns.size(), Long.toHexString(truncateZxid), e);
                } finally {
                    MDC.remove("targetServerId");
                    syncLatch.countDown();
                }
            });

            futures.add(future);
        }

        // 等待同步完成或超時
        if (!syncLatch.await(30, TimeUnit.SECONDS)) {
            logger.warn("Sync phase timed out");
        }

        // 取消未完成的任務
        for (CompletableFuture<?> future : futures) {
            if (!future.isDone()) {
                future.cancel(true);
            }
        }

        // 檢查是否有足夠的服務器同步成功
        if (successCount.get() < quorumSize) {
            throw new QuorumNotFoundException("Failed to sync with quorum of followers",
                                           successCount.get(), quorumSize);
        }
    }

    // 使用快照同步落後太多的Follower
    private void syncFollowerWithSnapshot(String followerId, long followerZxid) throws IOException {
        try {
            logger.info("Follower {} is too far behind (zxid: {}), syncing with snapshot",
                       followerId, Long.toHexString(followerZxid));

            // 1. 獲取當前狀態快照
            byte[] snapshot = stateMachine.takeSnapshot();

            // 2. 發送快照給Follower
            networkClient.sendSnapshot(followerId, snapshot, zxid.get());

            logger.info("Successfully sent snapshot to follower: {}", followerId);
        } catch (Exception e) {
            logger.error("Failed to sync follower {} with snapshot", followerId, e);
            throw new IOException("Snapshot sync failed", e);
        }
    }

    // 從事務日誌加載事務
    private List<Transaction> loadTransactionsFromLog(long fromZxid) throws IOException {
        List<Transaction> result = new ArrayList<>();
        // 實際實現會從持久化存儲讀取事務記錄
        logger.info("Loading transactions starting from zxid: {}", Long.toHexString(fromZxid));
        return result;
    }

    private int getQuorum() {
        return quorumSize / 2 + 1;
    }

    // 常量定義
    private static final long SNAPSHOT_THRESHOLD = 100000; // 事務差距超過10萬時使用快照

    // 衝突事務數據結構
    static class ConflictingTransaction {
        private final long conflictZxid;
        private final long truncateZxid;
        private final int conflictEpoch;
        private final int truncateEpoch;
        private final Set<String> servers;

        public ConflictingTransaction(long conflictZxid, long truncateZxid,
                                    int conflictEpoch, int truncateEpoch,
                                    Set<String> servers) {
            this.conflictZxid = conflictZxid;
            this.truncateZxid = truncateZxid;
            this.conflictEpoch = conflictEpoch;
            this.truncateEpoch = truncateEpoch;
            this.servers = new HashSet<>(servers);
        }

        public boolean isFromHigherEpoch() {
            return conflictEpoch > truncateEpoch;
        }

        public long getConflictZxid() {
            return conflictZxid;
        }

        public int getConflictEpoch() {
            return conflictEpoch;
        }

        public Set<String> getServers() {
            return Collections.unmodifiableSet(servers);
        }
    }

    // 其他內部類定義...

    enum ServerState {
        LOOKING,     // 尋找Leader
        FOLLOWING,   // Follower角色
        LEADING      // Leader角色
    }
}

ZAB 廣播模式實現

public class ZABBroadcast implements AutoCloseable {
    private final AtomicLong zxid;
    private final AtomicInteger epoch;
    private final ConcurrentMap<String, ServerData> followers;
    private final Logger logger = LoggerFactory.getLogger(ZABBroadcast.class);
    private final CircuitBreaker circuitBreaker;
    private final NetworkClient networkClient;
    private final StateMachine stateMachine;
    private final String serverId;
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final ScheduledExecutorService scheduler;
    private final MetricsCollector metrics;
    private final RateLimiter heartbeatLogLimiter = RateLimiter.create(0.1); // 每10秒最多一條日誌

    public ZABBroadcast(String serverId, AtomicLong zxid, AtomicInteger epoch,
                       NetworkClient networkClient, StateMachine stateMachine) {
        this.serverId = serverId;
        this.zxid = zxid;
        this.epoch = epoch;
        this.networkClient = networkClient;
        this.stateMachine = stateMachine;
        this.followers = new ConcurrentHashMap<>();
        this.circuitBreaker = new CircuitBreaker(5, 10000); // 5次失敗,10秒重置
        this.scheduler = Executors.newScheduledThreadPool(2, r -> {
            Thread t = new Thread(r, "zab-scheduler-" + serverId);
            t.setDaemon(true);
            return t;
        });
        this.metrics = new MetricsCollector("zab_broadcast");

        // 啓動心跳任務
        scheduler.scheduleWithFixedDelay(this::sendHeartbeats,
                                       500, 500, TimeUnit.MILLISECONDS);
    }

    // 添加Follower
    public void addFollower(ServerData follower) {
        followers.put(follower.getId(), follower);
    }

    // Leader處理寫請求
    public CompletableFuture<Boolean> processWrite(Request request) {
        Stopwatch stopwatch = Stopwatch.createStarted();
        MDC.put("component", "zab-broadcast");
        MDC.put("serverId", serverId);
        MDC.put("requestId", request.getId());

        try {
            return GlobalExceptionHandler.withExceptionHandling(
                circuitBreaker.execute(() -> {
                    try {
                        // 1. 為請求生成zxid (高32位是epoch,低32位是計數器)
                        long newZxid = createNewZxid();
                        MDC.put("zxid", Long.toHexString(newZxid));
                        logger.info("Processing write request: {} with zxid: {}",
                                   request.getId(), Long.toHexString(newZxid));

                        // 2. 將請求發送給所有Follower
                        List<Future<ACK>> futures = sendToFollowers(request, newZxid);

                        // 3. 等待過半Follower的ACK
                        if (waitForMajority(futures)) {
                            // 4. 通知所有Follower提交事務
                            commit(newZxid);
                            logger.info("Request {} committed successfully", request.getId());

                            // 5. 記錄指標
                            metrics.recordSuccessfulWrite(stopwatch.elapsed(TimeUnit.MILLISECONDS));
                            return CompletableFuture.completedFuture(true);
                        } else {
                            logger.warn("Failed to get majority ACKs for request {}", request.getId());
                            metrics.recordFailedWrite();
                            return CompletableFuture.completedFuture(false);
                        }
                    } catch (IOException e) {
                        logger.error("Failed to process write request: {}", request.getId(), e);
                        metrics.recordFailedWrite();
                        return CompletableFuture.failedFuture(
                            new ProcessingException("Failed to process write request", e));
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        logger.warn("Interrupted while processing write request: {}", request.getId(), e);
                        metrics.recordFailedWrite();
                        return CompletableFuture.failedFuture(
                            new ProcessingException("Interrupted during write processing", e));
                    }
                })
            );
        } catch (CircuitBreakerOpenException e) {
            logger.error("Circuit breaker is open, rejecting request: {}", request.getId());
            metrics.recordRejectedWrite();
            return CompletableFuture.failedFuture(
                new ProcessingException("Circuit breaker open, system overloaded", e));
        } finally {
            MDC.remove("component");
            MDC.remove("serverId");
            MDC.remove("requestId");
            MDC.remove("zxid");
        }
    }

    // 處理批量寫請求,提高吞吐量
    public CompletableFuture<Map<String, Boolean>> processBatchWrite(List<Request> requests) {
        if (requests.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }

        Stopwatch stopwatch = Stopwatch.createStarted();
        MDC.put("component", "zab-broadcast");
        MDC.put("serverId", serverId);
        MDC.put("batchSize", String.valueOf(requests.size()));

        try {
            return GlobalExceptionHandler.withExceptionHandling(
                circuitBreaker.execute(() -> {
                    Map<String, Boolean> results = new HashMap<>();
                    try {
                        // 創建批處理包
                        BatchRequest batch = new BatchRequest();
                        for (Request req : requests) {
                            batch.addRequest(req);
                            results.put(req.getId(), false); // 默認失敗
                        }

                        // 為批次生成一個zxid
                        long batchZxid = createNewZxid();
                        MDC.put("zxid", Long.toHexString(batchZxid));
                        logger.info("Processing batch of {} requests with zxid: {}",
                                   requests.size(), Long.toHexString(batchZxid));

                        // 發送批處理請求給所有Follower
                        List<Future<ACK>> futures = sendBatchToFollowers(batch, batchZxid);

                        // 等待多數派確認
                        if (waitForMajority(futures)) {
                            // 提交批次
                            commitBatch(batchZxid);
                            logger.info("Batch with {} requests committed successfully", requests.size());

                            // 設置所有請求結果為成功
                            for (Request req : requests) {
                                results.put(req.getId(), true);
                            }

                            metrics.recordSuccessfulBatchWrite(
                                requests.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
                        } else {
                            logger.warn("Failed to get majority ACKs for batch");
                            metrics.recordFailedBatchWrite(requests.size());
                        }
                    } catch (Exception e) {
                        logger.error("Error processing batch write of {} requests", requests.size(), e);
                        metrics.recordFailedBatchWrite(requests.size());
                    }
                    return CompletableFuture.completedFuture(results);
                })
            );
        } catch (CircuitBreakerOpenException e) {
            logger.error("Circuit breaker is open, rejecting batch of {} requests", requests.size());
            metrics.recordRejectedBatchWrite(requests.size());

            Map<String, Boolean> results = new HashMap<>();
            for (Request req : requests) {
                results.put(req.getId(), false);
            }
            return CompletableFuture.failedFuture(
                new ProcessingException("Circuit breaker open, system overloaded", e));
        } finally {
            MDC.remove("component");
            MDC.remove("serverId");
            MDC.remove("batchSize");
            MDC.remove("zxid");
        }
    }

    // 讀取操作的一致性保證
    public CompletableFuture<Result> readWithConsistency(String key, ConsistencyLevel level) {
        Stopwatch stopwatch = Stopwatch.createStarted();
        MDC.put("component", "zab-broadcast");
        MDC.put("serverId", serverId);
        MDC.put("key", key);
        MDC.put("consistency", level.name());

        try {
            ReadStrategy strategy = readStrategies.getOrDefault(
                level, readStrategies.get(ConsistencyLevel.EVENTUAL));

            CompletableFuture<Result> result = strategy.execute(key, this::readLocal);

            result.thenAccept(r ->
                metrics.recordRead(level, stopwatch.elapsed(TimeUnit.MILLISECONDS)));

            return result;
        } catch (Exception e) {
            logger.error("Error performing {} read for key: {}", level, key, e);
            metrics.recordFailedRead(level);
            return CompletableFuture.failedFuture(
                new ProcessingException("Read operation failed", e));
        } finally {
            MDC.remove("component");
            MDC.remove("serverId");
            MDC.remove("key");
            MDC.remove("consistency");
        }
    }

    // 本地讀取數據
    private Result readLocal(String key) {
        rwLock.readLock().lock();
        try {
            // 實際實現會從本地數據庫讀取
            return new Result(key, "value", true);
        } finally {
            rwLock.readLock().unlock();
        }
    }

    // 生成新的zxid,處理溢出情況
    private long createNewZxid() {
        rwLock.writeLock().lock();
        try {
            long currentCounter = zxid.get() & 0xFFFFFFFFL;
            // 檢測溢出並處理
            if (currentCounter >= 0xFFFFFFFFL) {
                // 計數器即將溢出,增加epoch
                int newEpoch = epoch.incrementAndGet();
                logger.warn("ZXID counter overflow, incrementing epoch to {}", newEpoch);
                long newZxid = ((long)newEpoch << 32); // 重置計數器
                zxid.set(newZxid);
                return newZxid;
            }
            return zxid.incrementAndGet();
        } finally {
            rwLock.writeLock().unlock();
        }
    }

    // 發送提案給所有Follower
    private List<Future<ACK>> sendToFollowers(Request request, long newZxid)
            throws IOException {
        List<Future<ACK>> futures = new ArrayList<>();
        ProposalPacket proposal = new ProposalPacket(newZxid, request);

        ExecutorService executor = Executors.newFixedThreadPool(followers.size(),
            r -> {
                Thread t = new Thread(r, "proposal-sender-" + serverId);
                t.setDaemon(true);
                return t;
            });

        try {
            for (var entry : followers.entrySet()) {
                final String targetServerId = entry.getKey();

                futures.add(executor.submit(() -> {
                    MDC.put("targetServerId", targetServerId);
                    try {
                        ACK ack = networkClient.sendProposal(targetServerId, proposal);
                        logger.debug("Received ACK from {} for zxid {}",
                                    targetServerId, Long.toHexString(newZxid));
                        return ack;
                    } catch (IOException e) {
                        logger.error("Failed to send proposal to follower {}, zxid: {}",
                                    targetServerId, Long.toHexString(newZxid), e);
                        return null;
                    } finally {
                        MDC.remove("targetServerId");
                    }
                }));
            }
        } finally {
            executor.shutdown();
            try {
                if (!executor.awaitTermination(200, TimeUnit.MILLISECONDS)) {
                    List<Runnable> pendingTasks = executor.shutdownNow();
                    logger.warn("Force shutdown executor with {} pending tasks", pendingTasks.size());
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                logger.warn("Interrupted while waiting for executor to terminate");
            }
        }

        return futures;
    }

    // 等待多數派響應
    private boolean waitForMajority(List<Future<ACK>> futures)
            throws InterruptedException {
        int ackCount = 0;
        int majority = (followers.size() / 2) + 1;

        for (Future<ACK> future : futures) {
            try {
                ACK ack = future.get(5, TimeUnit.SECONDS);
                if (ack != null && ack.isSuccess()) {
                    ackCount++;
                    if (ackCount >= majority) {
                        // 已獲得多數派確認,可以提前返回
                        return true;
                    }
                }
            } catch (ExecutionException e) {
                logger.warn("Error getting ACK", e.getCause());
            } catch (TimeoutException e) {
                logger.warn("Timeout waiting for ACK");
            }
        }

        return ackCount >= majority;
    }

    // 通知所有Follower提交事務
    private void commit(long zxid) throws IOException {
        CommitPacket commit = new CommitPacket(zxid);

        for (var entry : followers.entrySet()) {
            final String targetServerId = entry.getKey();

            CompletableFuture.runAsync(() -> {
                MDC.put("targetServerId", targetServerId);
                try {
                    networkClient.sendCommit(targetServerId, commit);
                    logger.debug("Sent commit to {} for zxid {}",
                                targetServerId, Long.toHexString(zxid));
                } catch (IOException e) {
                    logger.error("Failed to send commit to follower {}, zxid: {}",
                                targetServerId, Long.toHexString(zxid), e);
                } finally {
                    MDC.remove("targetServerId");
                }
            });
        }
    }

    // 發送批處理請求
    private List<Future<ACK>> sendBatchToFollowers(BatchRequest batch, long batchZxid)
            throws IOException {
        ProposalPacket proposal = new ProposalPacket(batchZxid, batch);
        return sendProposalToFollowers(proposal, batchZxid);
    }

    // 提交批處理請求
    private void commitBatch(long batchZxid) throws IOException {
        commit(batchZxid);
    }

    // 發送心跳給所有Follower
    private void sendHeartbeats() {
        long currentZxid = zxid.get();

        for (var entry : followers.entrySet()) {
            final String targetServerId = entry.getKey();

            CompletableFuture.runAsync(() -> {
                try {
                    networkClient.sendHeartbeat(targetServerId, currentZxid);
                } catch (IOException e) {
                    // 心跳失敗,使用限流器避免日誌氾濫
                    if (heartbeatLogLimiter.tryAcquire()) {
                        logger.debug("Failed to send heartbeat to {}", targetServerId, e);
                    }
                }
            });
        }
    }

    // 發送提案給所有Follower(通用方法)
    private List<Future<ACK>> sendProposalToFollowers(ProposalPacket proposal, long zxid)
            throws IOException {
        List<Future<ACK>> futures = new ArrayList<>();

        ExecutorService executor = Executors.newFixedThreadPool(followers.size(),
            r -> {
                Thread t = new Thread(r, "proposal-sender-" + serverId);
                t.setDaemon(true);
                return t;
            });

        try {
            for (var entry : followers.entrySet()) {
                final String targetServerId = entry.getKey();

                futures.add(executor.submit(() -> {
                    MDC.put("targetServerId", targetServerId);
                    try {
                        ACK ack = networkClient.sendProposal(targetServerId, proposal);
                        logger.debug("Received ACK from {} for zxid {}",
                                    targetServerId, Long.toHexString(zxid));
                        return ack;
                    } catch (IOException e) {
                        logger.error("Failed to send proposal to follower {}, zxid: {}",
                                    targetServerId, Long.toHexString(zxid), e);
                        return null;
                    } finally {
                        MDC.remove("targetServerId");
                    }
                }));
            }
        } finally {
            executor.shutdown();
            try {
                if (!executor.awaitTermination(200, TimeUnit.MILLISECONDS)) {
                    List<Runnable> pendingTasks = executor.shutdownNow();
                    logger.warn("Force shutdown executor with {} pending tasks", pendingTasks.size());
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                logger.warn("Interrupted while waiting for executor to terminate");
            }
        }

        return futures;
    }

    // 定義讀取策略接口和實現
    private interface ReadStrategy {
        CompletableFuture<Result> execute(String key, Supplier<Result> readFromLocal);
    }

    private final Map<ConsistencyLevel, ReadStrategy> readStrategies = new EnumMap<>(ConsistencyLevel.class);

    {
        // 初始化讀取策略
        readStrategies.put(ConsistencyLevel.LINEARIZABLE, new LinearizableReadStrategy());
        readStrategies.put(ConsistencyLevel.SEQUENTIAL, new SequentialReadStrategy());
        readStrategies.put(ConsistencyLevel.READ_YOUR_WRITES, new ReadYourWritesStrategy());
        readStrategies.put(ConsistencyLevel.BOUNDED_STALENESS, new BoundedStalenessStrategy());
        readStrategies.put(ConsistencyLevel.EVENTUAL, new EventualReadStrategy());
    }

    // 線性一致性讀取策略
    private class LinearizableReadStrategy implements ReadStrategy {
        private final AtomicLong leaseExpirationTime = new AtomicLong(0);
        private final long leaderLeaseMs = 5000; // 5秒租約

        @Override
        public CompletableFuture<Result> execute(String key, Supplier<Result> readFromLocal) {
            // Leader需要確認自己仍然是Leader (租約機制)
            if (System.currentTimeMillis() < leaseExpirationTime.get()) {
                // 租約有效,可以安全讀取
                return CompletableFuture.completedFuture(readFromLocal.get());
            } else {
                // 租約過期,需要重新獲取多數派確認
                return renewLease().thenApply(renewed -> {
                    if (renewed) {
                        return readFromLocal.get();
                    } else {
                        throw new ConsistencyException("Cannot guarantee linearizable read");
                    }
                });
            }
        }

        private CompletableFuture<Boolean> renewLease() {
            // 實際實現中,需要獲取多數派確認
            leaseExpirationTime.set(System.currentTimeMillis() + leaderLeaseMs);
            logger.info("Renewed leader lease until {}", leaseExpirationTime.get());
            return CompletableFuture.completedFuture(true);
        }
    }

    // 順序一致性讀取策略
    private class SequentialReadStrategy implements ReadStrategy {
        @Override
        public CompletableFuture<Result> execute(String key, Supplier<Result> readFromLocal) {
            // 確保應用了所有已提交的事務
            return ensureAppliedUpToDate()
                .thenApply(v -> readFromLocal.get());
        }

        private CompletableFuture<Void> ensureAppliedUpToDate() {
            // 實際實現會確保所有已提交的事務都已應用
            logger.debug("Ensuring all committed transactions are applied");
            return CompletableFuture.completedFuture(null);
        }
    }

    // 讀己所寫策略
    private class ReadYourWritesStrategy implements ReadStrategy {
        private final ConcurrentMap<String, Long> writeTimestamps = new ConcurrentHashMap<>();

        @Override
        public CompletableFuture<Result> execute(String key, Supplier<Result> readFromLocal) {
            // 檢查是否有該key的寫入記錄
            Long writeTime = writeTimestamps.get(key);
            if (writeTime != null) {
                // 確保經過足夠時間,寫入已經完成
                long elapsed = System.currentTimeMillis() - writeTime;
                if (elapsed < 100) {  // 假設100ms足夠寫入完成
                    try {
                        Thread.sleep(100 - elapsed);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }

            return CompletableFuture.completedFuture(readFromLocal.get());
        }

        // 記錄寫入操作
        public void recordWrite(String key) {
            writeTimestamps.put(key, System.currentTimeMillis());
        }
    }

    // 有界陳舊性策略
    private class BoundedStalenessStrategy implements ReadStrategy {
        private final ConcurrentMap<String, CacheEntry> cache = new ConcurrentHashMap<>();
        private final long maxStalenessMs = 1000; // 最大陳舊時間1秒

        @Override
        public CompletableFuture<Result> execute(String key, Supplier<Result> readFromLocal) {
            // 檢查緩存
            CacheEntry entry = cache.get(key);
            if (entry != null) {
                long age = System.currentTimeMillis() - entry.getTimestamp();
                if (age <= maxStalenessMs) {
                    // 緩存未過期,直接返回
                    return CompletableFuture.completedFuture(entry.getResult());
                }
            }

            // 緩存過期或不存在,從本地讀取並更新緩存
            Result result = readFromLocal.get();
            cache.put(key, new CacheEntry(result, System.currentTimeMillis()));
            return CompletableFuture.completedFuture(result);
        }

        // 定期清理過期緩存
        public void cleanup() {
            long now = System.currentTimeMillis();
            cache.entrySet().removeIf(entry ->
                now - entry.getValue().getTimestamp() > maxStalenessMs);
        }
    }

    // 最終一致性策略
    private class EventualReadStrategy implements ReadStrategy {
        @Override
        public CompletableFuture<Result> execute(String key, Supplier<Result> readFromLocal) {
            // 直接從本地讀取,不保證看到最新寫入
            return CompletableFuture.completedFuture(readFromLocal.get());
        }
    }

    // 緩存條目
    private static class CacheEntry {
        private final Result result;
        private final long timestamp;

        public CacheEntry(Result result, long timestamp) {
            this.result = result;
            this.timestamp = timestamp;
        }

        public Result getResult() {
            return result;
        }

        public long getTimestamp() {
            return timestamp;
        }
    }

    @Override
    public void close() {
        try {
            List<Runnable> pendingTasks = scheduler.shutdownNow();
            if (!pendingTasks.isEmpty()) {
                logger.warn("Scheduler shutdown with {} pending tasks", pendingTasks.size());
            }

            if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
                logger.warn("Scheduler did not terminate in time");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.warn("Interrupted while waiting for scheduler termination");
        }
    }

    // 斷路器實現(更安全的版本)
    static class CircuitBreaker {
        private final AtomicReference<State> state = new AtomicReference<>(State.CLOSED);
        private final AtomicLong failureCount = new AtomicLong(0);
        private final AtomicLong lastFailureTime = new AtomicLong(0);
        private final int threshold;
        private final long resetTimeoutMs;
        private final StampedLock stateLock = new StampedLock();
        private final Logger logger = LoggerFactory.getLogger(CircuitBreaker.class);

        public enum State { CLOSED, OPEN, HALF_OPEN }

        public CircuitBreaker(int threshold, long resetTimeoutMs) {
            this.threshold = threshold;
            this.resetTimeoutMs = resetTimeoutMs;
        }

        public <T> CompletableFuture<T> execute(Supplier<CompletableFuture<T>> action)
                throws CircuitBreakerOpenException {
            State currentState = getCurrentState();

            if (currentState == State.OPEN) {
                // 檢查是否應該嘗試半開狀態
                if (System.currentTimeMillis() - lastFailureTime.get() > resetTimeoutMs) {
                    boolean transitioned = tryTransitionState(State.OPEN, State.HALF_OPEN);
                    if (!transitioned) {
                        throw new CircuitBreakerOpenException("Circuit breaker is open");
                    }
                    currentState = State.HALF_OPEN;
                } else {
                    throw new CircuitBreakerOpenException("Circuit breaker is open");
                }
            }

            final State executionState = currentState;

            try {
                CompletableFuture<T> future = action.get();
                return future.handle((result, ex) -> {
                    if (ex != null) {
                        recordFailure();
                        throw new CompletionException(ex);
                    } else {
                        // 成功執行,重置失敗計數
                        if (executionState == State.HALF_OPEN) {
                            tryTransitionState(State.HALF_OPEN, State.CLOSED);
                        }
                        failureCount.set(0);
                        return result;
                    }
                });
            } catch (Exception e) {
                recordFailure();
                throw e;
            }
        }

        private void recordFailure() {
            long stamp = stateLock.writeLock();
            try {
                long failures = failureCount.incrementAndGet();
                lastFailureTime.set(System.currentTimeMillis());

                if (failures >= threshold && state.get() == State.CLOSED) {
                    logger.warn("Circuit breaker opening after {} failures", failures);
                    state.set(State.OPEN);
                }
            } finally {
                stateLock.unlockWrite(stamp);
            }
        }

        private boolean tryTransitionState(State fromState, State toState) {
            long stamp = stateLock.writeLock();
            try {
                if (state.get() == fromState) {
                    state.set(toState);
                    logger.info("Circuit breaker state changed from {} to {}", fromState, toState);
                    return true;
                }
                return false;
            } finally {
                stateLock.unlockWrite(stamp);
            }
        }

        // 使用樂觀讀獲取當前狀態
        public State getCurrentState() {
            long stamp = stateLock.tryOptimisticRead();
            State result = state.get();

            if (!stateLock.validate(stamp)) {
                stamp = stateLock.readLock();
                try {
                    result = state.get();
                } finally {
                    stateLock.unlockRead(stamp);
                }
            }

            return result;
        }
    }

    // 全局異常處理器
    static class GlobalExceptionHandler {
        private static final Logger logger = LoggerFactory.getLogger(GlobalExceptionHandler.class);

        public static <T> CompletableFuture<T> withExceptionHandling(CompletableFuture<T> future) {
            return future.exceptionally(e -> {
                Throwable cause = e instanceof CompletionException ? e.getCause() : e;

                if (cause instanceof ConsistencyException) {
                    logger.error("Consistency error: {}", cause.getMessage());
                } else if (cause instanceof IOException) {
                    logger.error("I/O error: {}", cause.getMessage());
                } else if (cause instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                    logger.warn("Operation interrupted");
                } else {
                    logger.error("Unexpected error: {}", cause.getClass().getName(), cause);
                }

                throw new CompletionException(cause);
            });
        }
    }

    // 指標收集類
    private static class MetricsCollector {
        private final Counter writeRequests;
        private final Counter writeSuccess;
        private final Counter writeFailed;
        private final Counter writeRejected;
        private final Counter batchWrites;
        private final Counter batchWriteRequests;
        private final Counter readRequests;
        private final Map<ConsistencyLevel, Counter> readsByLevel = new EnumMap<>(ConsistencyLevel.class);
        private final Histogram writeLatency;
        private final Histogram batchWriteLatency;
        private final Map<ConsistencyLevel, Histogram> readLatency = new EnumMap<>(ConsistencyLevel.class);

        public MetricsCollector(String prefix) {
            this.writeRequests = Counter.build()
                .name(prefix + "_write_requests_total")
                .help("Total number of write requests").register();

            this.writeSuccess = Counter.build()
                .name(prefix + "_write_success_total")
                .help("Total number of successful writes").register();

            this.writeFailed = Counter.build()
                .name(prefix + "_write_failed_total")
                .help("Total number of failed writes").register();

            this.writeRejected = Counter.build()
                .name(prefix + "_write_rejected_total")
                .help("Total number of rejected writes").register();

            this.batchWrites = Counter.build()
                .name(prefix + "_batch_writes_total")
                .help("Total number of batch writes").register();

            this.batchWriteRequests = Counter.build()
                .name(prefix + "_batch_write_requests_total")
                .help("Total number of requests in batch writes").register();

            this.readRequests = Counter.build()
                .name(prefix + "_read_requests_total")
                .help("Total number of read requests").register();

            this.writeLatency = Histogram.build()
                .name(prefix + "_write_latency_ms")
                .help("Write latency in milliseconds").register();

            this.batchWriteLatency = Histogram.build()
                .name(prefix + "_batch_write_latency_ms")
                .help("Batch write latency in milliseconds").register();

            // 初始化各一致性級別的計數器和直方圖
            for (ConsistencyLevel level : ConsistencyLevel.values()) {
                readsByLevel.put(level, Counter.build()
                    .name(prefix + "_reads_" + level.name().toLowerCase() + "_total")
                    .help("Total " + level + " reads").register());

                readLatency.put(level, Histogram.build()
                    .name(prefix + "_read_" + level.name().toLowerCase() + "_latency_ms")
                    .help(level + " read latency in milliseconds").register());
            }
        }

        public void recordSuccessfulWrite(long latencyMs) {
            writeRequests.inc();
            writeSuccess.inc();
            writeLatency.observe(latencyMs);
        }

        public void recordFailedWrite() {
            writeRequests.inc();
            writeFailed.inc();
        }

        public void recordRejectedWrite() {
            writeRequests.inc();
            writeRejected.inc();
        }

        public void recordSuccessfulBatchWrite(int batchSize, long latencyMs) {
            batchWrites.inc();
            batchWriteRequests.inc(batchSize);
            writeRequests.inc(batchSize);
            writeSuccess.inc(batchSize);
            batchWriteLatency.observe(latencyMs);
        }

        public void recordFailedBatchWrite(int batchSize) {
            batchWrites.inc();
            batchWriteRequests.inc(batchSize);
            writeRequests.inc(batchSize);
            writeFailed.inc(batchSize);
        }

        public void recordRejectedBatchWrite(int batchSize) {
            batchWrites.inc();
            batchWriteRequests.inc(batchSize);
            writeRequests.inc(batchSize);
            writeRejected.inc(batchSize);
        }

        public void recordRead(ConsistencyLevel level, long latencyMs) {
            readRequests.inc();
            readsByLevel.get(level).inc();
            readLatency.get(level).observe(latencyMs);
        }

        public void recordFailedRead(ConsistencyLevel level) {
            readRequests.inc();
            // 可以添加失敗計數器
        }
    }

    // 異常類
    public static class CircuitBreakerOpenException extends Exception {
        public CircuitBreakerOpenException(String message) {
            super(message);
        }
    }

    public static class ConsistencyException extends RuntimeException {
        public ConsistencyException(String message) {
            super(message);
        }
    }

    public static class ProcessingException extends RuntimeException {
        public ProcessingException(String message, Throwable cause) {
            super(message, cause);
        }
    }

    // 其他內部類和常量定義...

    enum ConsistencyLevel {
        LINEARIZABLE,    // 線性一致性(最強)
        SEQUENTIAL,      // 順序一致性
        READ_YOUR_WRITES, // 讀己所寫
        BOUNDED_STALENESS, // 有界陳舊性
        EVENTUAL         // 最終一致性(最弱)
    }
}

Fast Leader Election 算法

public class FastLeaderElection {
    private final AtomicLong logicalClock = new AtomicLong(0);
    private final ConcurrentMap<String, Vote> receivedVotes = new ConcurrentHashMap<>();
    private final String serverId;
    private final NetworkManager networkManager;
    private final int quorumSize;
    private final AtomicInteger electionAttempts = new AtomicInteger(0);
    private final Logger logger = LoggerFactory.getLogger(FastLeaderElection.class);
    private final ZxidUtils zxidUtils;

    public FastLeaderElection(String serverId, int quorumSize,
                            NetworkManager networkManager, ZxidUtils zxidUtils) {
        this.serverId = serverId;
        this.quorumSize = quorumSize;
        this.networkManager = networkManager;
        this.zxidUtils = zxidUtils;
    }

    public String lookForLeader() throws InterruptedException {
        MDC.put("component", "fast-leader-election");
        MDC.put("serverId", serverId);
        try {
            // 遞增邏輯時鐘
            long newLogicalClock = logicalClock.incrementAndGet();
            logger.info("Starting leader election with logical clock: {}", newLogicalClock);

            // 初始化選票,投給自己
            Vote vote = new Vote(serverId, zxidUtils.getLastZxid(), newLogicalClock);
            receivedVotes.clear();
            receivedVotes.put(serverId, vote);

            // 向所有其他服務器發送選票
            networkManager.broadcastVote(vote);

            // 選舉超時時間
            long startTime = System.currentTimeMillis();
            long maxTimeout = 60000; // 60秒最大超時

            // 選舉循環
            Map<String, Integer> voteCounter = new HashMap<>();
            String currentLeader = null;

            while (System.currentTimeMillis() - startTime < maxTimeout) {
                // 接收選票
                Vote receivedVote = networkManager.receiveVote(200); // 200ms超時
                if (receivedVote != null) {
                    MDC.put("candidateId", receivedVote.getServerId());
                    logger.debug("Received vote from {}: zxid={}, logicalClock={}",
                               receivedVote.getServerId(),
                               Long.toHexString(receivedVote.getZxid()),
                               receivedVote.getLogicalClock());

                    // 驗證邏輯時鐘
                    if (receivedVote.getLogicalClock() > newLogicalClock) {
                        // 發現更高的邏輯時鐘,需要更新自己的時鐘並重新開始選舉
                        logicalClock.set(receivedVote.getLogicalClock());
                        logger.info("Found higher logical clock: {}, restarting election",
                                   receivedVote.getLogicalClock());
                        MDC.remove("candidateId");
                        electionAttempts.set(0); // 重置嘗試計數
                        return lookForLeader(); // 重新開始選舉
                    } else if (receivedVote.getLogicalClock() < newLogicalClock) {
                        // 忽略舊的邏輯時鐘選票
                        logger.debug("Ignoring vote with older logical clock: {}",
                                   receivedVote.getLogicalClock());
                        MDC.remove("candidateId");
                        continue;
                    }

                    // 比較選票
                    int comparison = compareVotes(vote, receivedVote);
                    if (comparison < 0) {
                        // 收到更好的選票,更新自己的選票
                        vote = new Vote(receivedVote.getServerId(),
                                       receivedVote.getZxid(),
                                       newLogicalClock);
                        // 重新廣播更新後的選票
                        networkManager.broadcastVote(vote);
                        logger.info("Updated vote to server: {}", vote.getServerId());
                    }

                    // 記錄收到的選票
                    receivedVotes.put(receivedVote.getServerId(), receivedVote);
                    MDC.remove("candidateId");

                    // 統計票數
                    voteCounter.clear();
                    for (Vote v : receivedVotes.values()) {
                        String candidate = v.getServerId();
                        voteCounter.put(candidate, voteCounter.getOrDefault(candidate, 0) + 1);

                        // 檢查是否有候選人獲得多數派支持
                        if (voteCounter.get(candidate) >= quorumSize) {
                            currentLeader = candidate;
                            logger.info("Elected leader: {} with {} votes of {} required",
                                       candidate, voteCounter.get(candidate), quorumSize);
                            break;
                        }
                    }

                    if (currentLeader != null) {
                        break; // 選出了Leader
                    }
                }
            }

            if (currentLeader == null) {
                // 處理選舉失敗,使用指數退避避免活鎖
                handleElectionFailure();
                logger.warn("Failed to elect a leader, retrying...");
                return lookForLeader(); // 重試
            }

            electionAttempts.set(0); // 重置嘗試計數
            return currentLeader;

        } catch (Exception e) {
            logger.error("Error during leader election", e);
            // 增加選舉嘗試計數並退避
            handleElectionFailure();
            throw new LeaderElectionException("Leader election failed", e);
        } finally {
            MDC.remove("component");
            MDC.remove("serverId");
        }
    }

    // 處理選舉失敗,使用指數退避避免活鎖
    private void handleElectionFailure() {
        int attempts = electionAttempts.incrementAndGet();
        // 指數退避
        int backoffMs = Math.min(1000 * (1 << Math.min(attempts, 10)), 30000);
        // 添加隨機抖動避免同步
        backoffMs += ThreadLocalRandom.current().nextInt(backoffMs / 2);
        logger.info("Election attempt {} failed, backing off for {}ms", attempts, backoffMs);
        try {
            Thread.sleep(backoffMs);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.warn("Interrupted during election backoff");
        }
    }

    // 比較兩個選票,返回負數表示v2更好,0表示相等,正數表示v1更好
    private int compareVotes(Vote v1, Vote v2) {
        // 首先比較zxid,更大的zxid具有更高優先級
        long zxidDiff = ZxidUtils.compareZxid(v1.getZxid(), v2.getZxid());
        if (zxidDiff != 0) {
            return (int) Math.signum(zxidDiff);
        }

        // zxid相等,比較serverId
        return v1.getServerId().compareTo(v2.getServerId());
    }

    // 內部類和工具方法...

    static class Vote {
        private final String serverId;
        private final long zxid;
        private final long logicalClock;

        public Vote(String serverId, long zxid, long logicalClock) {
            this.serverId = serverId;
            this.zxid = zxid;
            this.logicalClock = logicalClock;
        }

        public String getServerId() {
            return serverId;
        }

        public long getZxid() {
            return zxid;
        }

        public long getLogicalClock() {
            return logicalClock;
        }

        @Override
        public String toString() {
            return "Vote{serverId='" + serverId + "', zxid=" + Long.toHexString(zxid) +
                   ", logicalClock=" + logicalClock + '}';
        }
    }

    // 自定義異常類
    public static class LeaderElectionException extends RuntimeException {
        public LeaderElectionException(String message, Throwable cause) {
            super(message, cause);
        }
    }
}

網絡客户端實現示例

public class NettyNetworkClient implements NetworkClient {
    private final EventLoopGroup workerGroup;
    private final Bootstrap bootstrap;
    private final ConcurrentMap<String, Channel> channels = new ConcurrentHashMap<>();
    private final int connectionTimeoutMs;
    private final Logger logger = LoggerFactory.getLogger(NettyNetworkClient.class);

    public NettyNetworkClient(int connectionTimeoutMs) {
        this.connectionTimeoutMs = connectionTimeoutMs;
        this.workerGroup = new NioEventLoopGroup();
        this.bootstrap = new Bootstrap()
            .group(workerGroup)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeoutMs)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) {
                    ch.pipeline()
                        .addLast(new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4))
                        .addLast(new LengthFieldPrepender(4))
                        .addLast(new PacketEncoder())
                        .addLast(new PacketDecoder())
                        .addLast(new ClientHandler());
                }
            });
    }

    @Override
    public void connect(String serverId, String address, int port) throws IOException {
        try {
            ChannelFuture future = bootstrap.connect(address, port);
            boolean connected = future.await(connectionTimeoutMs, TimeUnit.MILLISECONDS);

            if (!connected || !future.isSuccess()) {
                throw new IOException("Failed to connect to " + serverId + " at " +
                                    address + ":" + port);
            }

            channels.put(serverId, future.channel());
            logger.info("Connected to server: {} at {}:{}", serverId, address, port);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException("Interrupted while connecting to " + serverId, e);
        } catch (Exception e) {
            throw new IOException("Failed to connect to " + serverId, e);
        }
    }

    @Override
    public void disconnect(String serverId) {
        Channel channel = channels.remove(serverId);
        if (channel != null) {
            channel.close();
            logger.info("Disconnected from server: {}", serverId);
        }
    }

    @Override
    public ACK sendProposal(String serverId, ProposalPacket proposal) throws IOException {
        MDC.put("targetServerId", serverId);
        try {
            Channel channel = getChannel(serverId);
            RequestFuture<ACK> future = new RequestFuture<>();

            // 存儲請求-響應映射
            Long requestId = generateRequestId();
            RequestRegistry.register(requestId, future);

            // 包裝請求
            Request request = new Request(requestId, RequestType.PROPOSAL, proposal);

            // 發送請求
            channel.writeAndFlush(request).sync();

            // 等待響應
            ACK ack = future.get(5, TimeUnit.SECONDS);
            if (ack == null) {
                throw new IOException("Received null ACK from " + serverId);
            }

            return ack;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException("Interrupted while sending proposal to " + serverId, e);
        } catch (TimeoutException e) {
            throw new IOException("Timed out waiting for ACK from " + serverId, e);
        } catch (ExecutionException e) {
            throw new IOException("Error sending proposal to " + serverId, e.getCause());
        } finally {
            MDC.remove("targetServerId");
        }
    }

    @Override
    public void sendCommit(String serverId, CommitPacket commit) throws IOException {
        MDC.put("targetServerId", serverId);
        try {
            Channel channel = getChannel(serverId);

            // 包裝請求
            Request request = new Request(generateRequestId(), RequestType.COMMIT, commit);

            // 發送請求 - 不等待響應
            channel.writeAndFlush(request);
        } catch (Exception e) {
            throw new IOException("Error sending commit to " + serverId, e);
        } finally {
            MDC.remove("targetServerId");
        }
    }

    @Override
    public LastZxidResponse sendEpochRequest(String serverId, EpochPacket epochPkt)
            throws IOException {
        MDC.put("targetServerId", serverId);
        try {
            Channel channel = getChannel(serverId);
            RequestFuture<LastZxidResponse> future = new RequestFuture<>();

            // 存儲請求-響應映射
            Long requestId = generateRequestId();
            RequestRegistry.register(requestId, future);

            // 包裝請求
            Request request = new Request(requestId, RequestType.EPOCH, epochPkt);

            // 發送請求
            channel.writeAndFlush(request).sync();

            // 等待響應
            LastZxidResponse response = future.get(5, TimeUnit.SECONDS);
            if (response == null) {
                throw new IOException("Received null LastZxidResponse from " + serverId);
            }

            return response;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException("Interrupted while sending epoch request to " + serverId, e);
        } catch (TimeoutException e) {
            throw new IOException("Timed out waiting for LastZxidResponse from " + serverId, e);
        } catch (ExecutionException e) {
            throw new IOException("Error sending epoch request to " + serverId, e.getCause());
        } finally {
            MDC.remove("targetServerId");
        }
    }

    // 實現其他接口方法...

    @Override
    public void sendSnapshot(String serverId, byte[] snapshot, long zxid) throws IOException {
        MDC.put("targetServerId", serverId);
        try {
            Channel channel = getChannel(serverId);

            // 由於快照可能很大,按塊發送
            int chunkSize = 1024 * 1024; // 1MB塊
            int totalChunks = (snapshot.length + chunkSize - 1) / chunkSize;

            logger.info("Sending snapshot to {}, size: {} bytes, chunks: {}",
                       serverId, snapshot.length, totalChunks);

            // 發送快照元數據
            SnapshotMetadata metadata = new SnapshotMetadata(zxid, snapshot.length, totalChunks);
            Request metadataRequest = new Request(generateRequestId(),
                                               RequestType.SNAPSHOT_META, metadata);
            channel.writeAndFlush(metadataRequest).sync();

            // 分塊發送快照數據
            for (int i = 0; i < totalChunks; i++) {
                int offset = i * chunkSize;
                int length = Math.min(chunkSize, snapshot.length - offset);
                byte[] chunk = new byte[length];
                System.arraycopy(snapshot, offset, chunk, 0, length);

                SnapshotChunk snapshotChunk = new SnapshotChunk(i, totalChunks, chunk);
                Request chunkRequest = new Request(generateRequestId(),
                                                RequestType.SNAPSHOT_CHUNK, snapshotChunk);

                channel.writeAndFlush(chunkRequest).sync();

                if (i % 10 == 0 || i == totalChunks - 1) {
                    logger.debug("Sent snapshot chunk {}/{} to {}",
                               i + 1, totalChunks, serverId);
                }
            }

            logger.info("Snapshot sent successfully to {}", serverId);
        } catch (Exception e) {
            throw new IOException("Error sending snapshot to " + serverId, e);
        } finally {
            MDC.remove("targetServerId");
        }
    }

    // 獲取連接到指定服務器的通道
    private Channel getChannel(String serverId) throws IOException {
        Channel channel = channels.get(serverId);
        if (channel == null || !channel.isActive()) {
            throw new IOException("No active connection to server: " + serverId);
        }
        return channel;
    }

    // 生成唯一請求ID
    private static final AtomicLong requestIdGenerator = new AtomicLong(0);

    private static Long generateRequestId() {
        return requestIdGenerator.incrementAndGet();
    }

    // 關閉客户端
    public void shutdown() {
        // 關閉所有連接
        for (Channel channel : channels.values()) {
            channel.close();
        }
        channels.clear();

        // 關閉事件循環組
        workerGroup.shutdownGracefully();
    }

    // 請求類型
    enum RequestType {
        PROPOSAL, COMMIT, EPOCH, TRUNCATE, TRANSACTION, NEWLEADER, HEARTBEAT,
        SNAPSHOT_META, SNAPSHOT_CHUNK
    }

    // 請求對象
    static class Request {
        private final Long id;
        private final RequestType type;
        private final Object payload;

        public Request(Long id, RequestType type, Object payload) {
            this.id = id;
            this.type = type;
            this.payload = payload;
        }

        public Long getId() {
            return id;
        }

        public RequestType getType() {
            return type;
        }

        public Object getPayload() {
            return payload;
        }
    }

    // 快照元數據
    static class SnapshotMetadata {
        private final long zxid;
        private final int totalSize;
        private final int totalChunks;

        public SnapshotMetadata(long zxid, int totalSize, int totalChunks) {
            this.zxid = zxid;
            this.totalSize = totalSize;
            this.totalChunks = totalChunks;
        }

        public long getZxid() {
            return zxid;
        }

        public int getTotalSize() {
            return totalSize;
        }

        public int getTotalChunks() {
            return totalChunks;
        }
    }

    // 快照數據塊
    static class SnapshotChunk {
        private final int chunkIndex;
        private final int totalChunks;
        private final byte[] data;

        public SnapshotChunk(int chunkIndex, int totalChunks, byte[] data) {
            this.chunkIndex = chunkIndex;
            this.totalChunks = totalChunks;
            this.data = data.clone(); // 防禦性複製
        }

        public int getChunkIndex() {
            return chunkIndex;
        }

        public int getTotalChunks() {
            return totalChunks;
        }

        public byte[] getData() {
            return data.clone(); // 防禦性複製
        }
    }

    // 請求-響應映射註冊表
    static class RequestRegistry {
        private static final ConcurrentMap<Long, RequestFuture<?>> futures = new ConcurrentHashMap<>();

        public static <T> void register(Long requestId, RequestFuture<T> future) {
            futures.put(requestId, future);
        }

        @SuppressWarnings("unchecked")
        public static <T> void complete(Long requestId, T response) {
            RequestFuture<T> future = (RequestFuture<T>) futures.remove(requestId);
            if (future != null) {
                future.complete(response);
            }
        }

        public static void completeExceptionally(Long requestId, Throwable exception) {
            RequestFuture<?> future = futures.remove(requestId);
            if (future != null) {
                future.completeExceptionally(exception);
            }
        }
    }

    // 請求Future
    static class RequestFuture<T> extends CompletableFuture<T> {
        // 繼承CompletableFuture,無需額外實現
    }

    // 客户端處理器
    private class ClientHandler extends SimpleChannelInboundHandler<Response> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Response response) {
            Long requestId = response.getRequestId();
            if (response.isSuccess()) {
                RequestRegistry.complete(requestId, response.getPayload());
            } else {
                RequestRegistry.completeExceptionally(requestId,
                    new IOException("Request failed: " + response.getErrorMessage()));
            }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            logger.error("Network client exception", cause);
            ctx.close();
        }
    }

    // 響應對象
    static class Response {
        private final Long requestId;
        private final boolean success;
        private final Object payload;
        private final String errorMessage;

        public Response(Long requestId, boolean success, Object payload, String errorMessage) {
            this.requestId = requestId;
            this.success = success;
            this.payload = payload;
            this.errorMessage = errorMessage;
        }

        public Long getRequestId() {
            return requestId;
        }

        public boolean isSuccess() {
            return success;
        }

        public Object getPayload() {
            return payload;
        }

        public String getErrorMessage() {
            return errorMessage;
        }
    }

    // 編碼器
    static class PacketEncoder extends MessageToByteEncoder<Request> {
        @Override
        protected void encode(ChannelHandlerContext ctx, Request msg, ByteBuf out) throws Exception {
            // 使用協議緩衝區或自定義序列化
            // 這裏簡化為示例
            byte[] bytes = serializeRequest(msg);
            out.writeBytes(bytes);
        }

        private byte[] serializeRequest(Request request) {
            // 實際實現應使用正式的序列化機制
            // 這裏簡化為示例
            return new byte[0];
        }
    }

    // 解碼器
    static class PacketDecoder extends ByteToMessageDecoder {
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            // 使用協議緩衝區或自定義反序列化
            // 這裏簡化為示例
            if (in.readableBytes() >= 4) { // 至少包含長度字段
                in.markReaderIndex();
                int length = in.readInt();

                if (in.readableBytes() < length) {
                    in.resetReaderIndex();
                    return;
                }

                byte[] data = new byte[length];
                in.readBytes(data);

                Response response = deserializeResponse(data);
                out.add(response);
            }
        }

        private Response deserializeResponse(byte[] data) {
            // 實際實現應使用正式的反序列化機制
            // 這裏簡化為示例
            return null;
        }
    }
}

三、Paxos 算法實現

核心接口定義

// 角色接口定義
public interface Proposer {
    CompletableFuture<Boolean> prepare(int ballot);
    CompletableFuture<Boolean> propose(int ballot, Object value);
}

public interface Acceptor {
    CompletableFuture<Promise> handlePrepare(int ballot);
    CompletableFuture<Accepted> handleAccept(int ballot, Object value);
}

public interface Learner {
    void learn(long instanceId, int ballot, Object value);
}

public interface NetworkClient {
    CompletableFuture<Promise> sendPrepare(int nodeId, int ballot);
    CompletableFuture<Accepted> sendAccept(int nodeId, int ballot, Object value);
    void sendLearn(int nodeId, long instanceId, int ballot, Object value);
    CompletableFuture<Map<Long, PrepareResponse>> sendPrepareAllInstances(int nodeId, int ballot);
    CompletableFuture<Void> sendSnapshot(int nodeId, byte[] snapshot, long lastInstanceId);
}

public interface StateMachine {
    CompletableFuture<Void> apply(long instanceId, byte[] command);
    long getLastApplied();
    CompletableFuture<byte[]> takeSnapshot();
    CompletableFuture<Void> restoreSnapshot(byte[] snapshot, long instanceId);
}

Basic Paxos 實現

public class BasicPaxosNode implements Proposer, Acceptor, Learner, AutoCloseable {
    private final int nodeId;
    private final AtomicInteger ballot = new AtomicInteger(0);
    private volatile Object proposalValue = null;
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private volatile int acceptedBallot = 0;
    private volatile Object acceptedValue = null;
    private final int totalNodes;
    private final NetworkClient networkClient;
    private final Logger logger = LoggerFactory.getLogger(BasicPaxosNode.class);
    private final RetryStrategy retryStrategy;
    private final MetricsCollector metrics;

    public BasicPaxosNode(int nodeId, int totalNodes, NetworkClient networkClient) {
        this.nodeId = nodeId;
        this.totalNodes = totalNodes;
        this.networkClient = networkClient;
        this.retryStrategy = new ExponentialBackoffRetry(100, 5000, 3);
        this.metrics = new MetricsCollector("paxos_basic", nodeId);
    }

    // Proposer: 準備階段
    @Override
    public CompletableFuture<Boolean> prepare(int suggestedBallot) {
        final int newBallot = suggestedBallot > 0 ? suggestedBallot : generateNewBallot();
        final Stopwatch stopwatch = Stopwatch.createStarted();

        MDC.put("component", "paxos-proposer");
        MDC.put("nodeId", String.valueOf(nodeId));
        MDC.put("ballot", String.valueOf(newBallot));
        logger.info("Starting prepare phase with ballot {}", newBallot);

        CompletableFuture<Boolean> result = new CompletableFuture<>();
        CompletableFuture.runAsync(() -> {
            try {
                // 向所有Acceptor發送Prepare請求
                List<CompletableFuture<Promise>> futures = sendPrepare(newBallot);

                // 收集結果
                List<Promise> promises = new ArrayList<>();
                for (CompletableFuture<Promise> future : futures) {
                    try {
                        Promise promise = future.get(3, TimeUnit.SECONDS);
                        if (promise != null) {
                            promises.add(promise);
                        }
                    } catch (Exception e) {
                        logger.warn("Error getting prepare response", e);
                    }
                }

                // 如果獲得多數派響應
                int quorum = getQuorum();
                int okCount = (int) promises.stream().filter(Promise::isOk).count();

                if (okCount >= quorum) {
                    // 更新ballot
                    ballot.updateAndGet(current -> Math.max(current, newBallot));

                    // 選擇已接受的最高編號提案的值
                    Promise highestPromise = selectHighestBallotPromise(promises);
                    rwLock.writeLock().lock();
                    try {
                        if (highestPromise != null && highestPromise.getAcceptedValue() != null) {
                            proposalValue = highestPromise.getAcceptedValue();
                            logger.info("Using previously accepted value: {}", proposalValue);
                        }
                    } finally {
                        rwLock.writeLock().unlock();
                    }

                    metrics.recordPrepareSuccess(stopwatch.elapsed(TimeUnit.MILLISECONDS));
                    result.complete(true);
                } else {
                    logger.info("Failed to get quorum in prepare phase: {} of {} responses ok",
                               okCount, promises.size());
                    metrics.recordPrepareFailed();
                    result.complete(false);
                }
            } catch (Exception e) {
                logger.error("Error in prepare phase", e);
                metrics.recordPrepareFailed();
                result.completeExceptionally(e);
            } finally {
                MDC.remove("component");
                MDC.remove("nodeId");
                MDC.remove("ballot");
            }
        });

        return result;
    }

    // Proposer: 接受階段
    @Override
    public CompletableFuture<Boolean> propose(int ballot, Object value) {
        final Stopwatch stopwatch = Stopwatch.createStarted();

        MDC.put("component", "paxos-proposer");
        MDC.put("nodeId", String.valueOf(nodeId));
        MDC.put("ballot", String.valueOf(ballot));

        return prepare(ballot).thenCompose(prepared -> {
            if (!prepared) {
                logger.info("Prepare phase failed, cannot proceed to propose");
                metrics.recordProposeFailed();
                return CompletableFuture.completedFuture(false);
            }

            // 獲取當前要提議的值
            final Object valueToPropose;
            rwLock.readLock().lock();
            try {
                // 如果準備階段沒有發現已接受的值,使用提議者的值
                valueToPropose = proposalValue != null ? proposalValue : value;
                logger.info("Starting accept phase with ballot {} and value {}",
                           ballot, valueToPropose);
            } finally {
                rwLock.readLock().unlock();
            }

            return CompletableFuture.supplyAsync(() -> {
                try {
                    // 向所有Acceptor發送Accept請求
                    List<CompletableFuture<Accepted>> futures = sendAccept(ballot, valueToPropose);

                    // 收集結果
                    List<Accepted> responses = new ArrayList<>();
                    for (CompletableFuture<Accepted> future : futures) {
                        try {
                            Accepted accepted = future.get(3, TimeUnit.SECONDS);
                            if (accepted != null) {
                                responses.add(accepted);
                            }
                        } catch (Exception e) {
                            logger.warn("Error getting accept response", e);
                        }
                    }

                    // 如果獲得多數派接受
                    int quorum = getQuorum();
                    int accepted = (int) responses.stream().filter(Accepted::isOk).count();
                    boolean success = accepted >= quorum;

                    if (success) {
                        logger.info("Value {} has been accepted by the majority ({} of {})",
                                   valueToPropose, accepted, responses.size());

                        // 通知所有Learner
                        broadcastToLearners(1, ballot, valueToPropose);
                        metrics.recordProposeSuccess(stopwatch.elapsed(TimeUnit.MILLISECONDS));
                    } else {
                        logger.info("Failed to get quorum in accept phase: {} of {} responses ok",
                                   accepted, responses.size());
                        metrics.recordProposeFailed();
                    }

                    return success;
                } catch (Exception e) {
                    logger.error("Error in propose phase", e);
                    metrics.recordProposeFailed();
                    throw new CompletionException(e);
                } finally {
                    MDC.remove("component");
                    MDC.remove("nodeId");
                    MDC.remove("ballot");
                }
            });
        }).exceptionally(e -> {
            logger.error("Failed to propose value", e);
            metrics.recordProposeFailed();
            return false;
        });
    }

    // Acceptor: 處理Prepare請求
    @Override
    public CompletableFuture<Promise> handlePrepare(int proposalBallot) {
        MDC.put("component", "paxos-acceptor");
        MDC.put("nodeId", String.valueOf(nodeId));
        MDC.put("ballot", String.valueOf(proposalBallot));

        return CompletableFuture.supplyAsync(() -> {
            Promise promise = new Promise();

            rwLock.writeLock().lock();
            try {
                if (proposalBallot > acceptedBallot) {
                    // 承諾不再接受編號小於等於proposalBallot的提案
                    acceptedBallot = proposalBallot;
                    promise.setOk(true);
                    promise.setAcceptedBallot(this.acceptedBallot);
                    promise.setAcceptedValue(this.acceptedValue);
                    logger.info("Acceptor {} promised ballot {}", nodeId, proposalBallot);
                    metrics.recordPromiseMade();
                } else {
                    promise.setOk(false);
                    logger.info("Acceptor {} rejected ballot {}, current ballot: {}",
                               nodeId, proposalBallot, acceptedBallot);
                    metrics.recordPromiseRejected();
                }
                return promise;
            } finally {
                rwLock.writeLock().unlock();
                MDC.remove("component");
                MDC.remove("nodeId");
                MDC.remove("ballot");
            }
        });
    }

    // Acceptor: 處理Accept請求
    @Override
    public CompletableFuture<Accepted> handleAccept(int proposalBallot, Object proposalValue) {
        MDC.put("component", "paxos-acceptor");
        MDC.put("nodeId", String.valueOf(nodeId));
        MDC.put("ballot", String.valueOf(proposalBallot));

        return CompletableFuture.supplyAsync(() -> {
            Accepted accepted = new Accepted();

            rwLock.writeLock().lock();
            try {
                if (proposalBallot >= acceptedBallot) {
                    acceptedBallot = proposalBallot;
                    acceptedValue = proposalValue;
                    accepted.setOk(true);
                    logger.info("Acceptor {} accepted ballot {} with value {}",
                               nodeId, proposalBallot, proposalValue);
                    metrics.recordAcceptMade();
                } else {
                    accepted.setOk(false);
                    logger.info("Acceptor {} rejected accept for ballot {}, current ballot: {}",
                               nodeId, proposalBallot, acceptedBallot);
                    metrics.recordAcceptRejected();
                }
                return accepted;
            } finally {
                rwLock.writeLock().unlock();
                MDC.remove("component");
                MDC.remove("nodeId");
                MDC.remove("ballot");
            }
        });
    }

    // Learner: 學習已決議的值
    @Override
    public void learn(long instanceId, int ballot, Object value) {
        MDC.put("component", "paxos-learner");
        MDC.put("nodeId", String.valueOf(nodeId));
        MDC.put("instanceId", String.valueOf(instanceId));
        MDC.put("ballot", String.valueOf(ballot));

        try {
            logger.info("Learner {} learned value {} for instance {} with ballot {}",
                       nodeId, value, instanceId, ballot);
            metrics.recordLearnReceived();

            // 實際實現中,這裏會將學習到的值應用到狀態機
            // applyToStateMachine(instanceId, value);
        } finally {
            MDC.remove("component");
            MDC.remove("nodeId");
            MDC.remove("instanceId");
            MDC.remove("ballot");
        }
    }

    // 發送Prepare請求給所有Acceptor
    private List<CompletableFuture<Promise>> sendPrepare(int newBallot) {
        List<CompletableFuture<Promise>> futures = new ArrayList<>();

        for (int i = 0; i < totalNodes; i++) {
            final int targetNodeId = i;
            if (targetNodeId == this.nodeId) {
                // 處理本地請求
                futures.add(handlePrepare(newBallot));
            } else {
                // 發送遠程請求
                futures.add(networkClient.sendPrepare(targetNodeId, newBallot)
                    .exceptionally(e -> {
                        logger.error("Failed to send prepare to node {}", targetNodeId, e);
                        return null;
                    }));
            }
        }

        return futures;
    }

    // 發送Accept請求給所有Acceptor
    private List<CompletableFuture<Accepted>> sendAccept(int ballot, Object value) {
        List<CompletableFuture<Accepted>> futures = new ArrayList<>();

        for (int i = 0; i < totalNodes; i++) {
            final int targetNodeId = i;
            if (targetNodeId == this.nodeId) {
                // 處理本地請求
                futures.add(handleAccept(ballot, value));
            } else {
                // 發送遠程請求
                futures.add(networkClient.sendAccept(targetNodeId, ballot, value)
                    .exceptionally(e -> {
                        logger.error("Failed to send accept to node {}", targetNodeId, e);
                        return null;
                    }));
            }
        }

        return futures;
    }

    // 通知所有Learner已決議的值
    private void broadcastToLearners(long instanceId, int ballot, Object value) {
        for (int i = 0; i < totalNodes; i++) {
            final int targetNodeId = i;
            if (targetNodeId == this.nodeId) {
                // 本地學習
                learn(instanceId, ballot, value);
            } else {
                // 異步通知其他Learner
                CompletableFuture.runAsync(() -> {
                    try {
                        networkClient.sendLearn(targetNodeId, instanceId, ballot, value);
                    } catch (Exception e) {
                        logger.error("Failed to notify learner {}", targetNodeId, e);
                    }
                });
            }
        }
    }

    // 選擇最高ballot的Promise
    private Promise selectHighestBallotPromise(List<Promise> promises) {
        return promises.stream()
                      .filter(p -> p.isOk() && p.getAcceptedValue() != null)
                      .max(Comparator.comparingInt(Promise::getAcceptedBallot))
                      .orElse(null);
    }

    // 生成比當前更大的提案編號 (加入節點ID保證唯一性)
    private int generateNewBallot() {
        // 確保新ballot大於之前的,並且保證不同節點的ballot唯一
        return ballot.incrementAndGet() * totalNodes + nodeId;
    }

    // 獲取多數派數量
    private int getQuorum() {
        return totalNodes / 2 + 1;
    }

    @Override
    public void close() {
        // 釋放資源
        metrics.close();
    }

    // Promise類
    public static class Promise {
        private boolean ok;
        private int acceptedBallot;
        private Object acceptedValue;

        public boolean isOk() {
            return ok;
        }

        public void setOk(boolean ok) {
            this.ok = ok;
        }

        public int getAcceptedBallot() {
            return acceptedBallot;
        }

        public void setAcceptedBallot(int acceptedBallot) {
            this.acceptedBallot = acceptedBallot;
        }

        public Object getAcceptedValue() {
            return acceptedValue;
        }

        public void setAcceptedValue(Object acceptedValue) {
            this.acceptedValue = acceptedValue;
        }
    }

    // Accepted類
    public static class Accepted {
        private boolean ok;

        public boolean isOk() {
            return ok;
        }

        public void setOk(boolean ok) {
            this.ok = ok;
        }
    }

    // 指標收集類
    private static class MetricsCollector implements AutoCloseable {
        // 指標定義...

        public MetricsCollector(String prefix, int nodeId) {
            // 初始化指標...
        }

        public void recordPrepareSuccess(long latencyMs) {
            // 記錄準備階段成功
        }

        public void recordPrepareFailed() {
            // 記錄準備階段失敗
        }

        public void recordProposeSuccess(long latencyMs) {
            // 記錄提議階段成功
        }

        public void recordProposeFailed() {
            // 記錄提議階段失敗
        }

        public void recordPromiseMade() {
            // 記錄承諾次數
        }

        public void recordPromiseRejected() {
            // 記錄拒絕承諾次數
        }

        public void recordAcceptMade() {
            // 記錄接受次數
        }

        public void recordAcceptRejected() {
            // 記錄拒絕接受次數
        }

        public void recordLearnReceived() {
            // 記錄學習次數
        }

        @Override
        public void close() {
            // 清理資源
        }
    }

    // 異常處理與重試策略
    interface RetryStrategy {
        <T> CompletableFuture<T> retry(Supplier<CompletableFuture<T>> action);
    }

    // 指數退避重試策略
    static class ExponentialBackoffRetry implements RetryStrategy {
        private final long initialBackoffMs;
        private final long maxBackoffMs;
        private final int maxRetries;
        private final Logger logger = LoggerFactory.getLogger(ExponentialBackoffRetry.class);

        public ExponentialBackoffRetry(long initialBackoffMs, long maxBackoffMs, int maxRetries) {
            this.initialBackoffMs = initialBackoffMs;
            this.maxBackoffMs = maxBackoffMs;
            this.maxRetries = maxRetries;
        }

        @Override
        public <T> CompletableFuture<T> retry(Supplier<CompletableFuture<T>> action) {
            return retryInternal(action, 0);
        }

        private <T> CompletableFuture<T> retryInternal(Supplier<CompletableFuture<T>> action,
                                                    int attempt) {
            return action.get().exceptionally(e -> {
                if (attempt >= maxRetries) {
                    throw new CompletionException(
                        new RetryExhaustedException("Max retries exceeded", e));
                }

                long backoff = Math.min(initialBackoffMs * (long)Math.pow(2, attempt), maxBackoffMs);
                backoff += ThreadLocalRandom.current().nextInt((int)(backoff / 5));

                logger.info("Retry attempt {} after {}ms due to: {}",
                           attempt + 1, backoff, e.getMessage());

                return CompletableFuture.delayedExecutor(backoff, TimeUnit.MILLISECONDS)
                    .execute(() -> retryInternal(action, attempt + 1))
                    .join();
            });
        }
    }

    // 自定義異常類
    public static class RetryExhaustedException extends RuntimeException {
        public RetryExhaustedException(String message, Throwable cause) {
            super(message, cause);
        }
    }
}

Multi-Paxos 實現

下面實現了 Multi-Paxos 的組件化架構,通過分離關注點提高代碼的可維護性:

public class MultiPaxosSystem {
    private final int nodeId;
    private final Configuration config;
    private final MultiPaxosLog log;
    private final MultiPaxosStateMachine stateMachine;
    private final MultiPaxosNetworking networking;
    private final RoleManager roleManager;
    private final ScheduledExecutorService scheduler;
    private final Logger logger = LoggerFactory.getLogger(MultiPaxosSystem.class);

    public MultiPaxosSystem(int nodeId, Configuration config) {
        this.nodeId = nodeId;
        this.config = config;
        this.log = new MultiPaxosLog();
        this.stateMachine = new MultiPaxosStateMachine();
        this.networking = new MultiPaxosNetworking(nodeId, config.getNodes());
        this.roleManager = new RoleManager(this);

        this.scheduler = Executors.newScheduledThreadPool(2, r -> {
            Thread t = new Thread(r, "multi-paxos-scheduler-" + nodeId);
            t.setDaemon(true);
            return t;
        });

        // 啓動日誌應用線程
        scheduler.scheduleWithFixedDelay(this::applyCommittedLogs, 100, 100, TimeUnit.MILLISECONDS);

        // 啓動Leader租約檢查
        scheduler.scheduleWithFixedDelay(this::checkLeaderLease, 1000, 1000, TimeUnit.MILLISECONDS);
    }

    // 客户端API

    // 追加新日誌(寫操作)
    public CompletableFuture<Boolean> appendLog(byte[] command) {
        if (!roleManager.isLeader()) {
            return CompletableFuture.failedFuture(
                new NotLeaderException("Not the leader", roleManager.getLeaderHint()));
        }

        return roleManager.getLeaderRole().appendLog(command);
    }

    // 讀取操作
    public CompletableFuture<byte[]> read(String key, ConsistencyLevel level) {
        switch (level) {
            case LINEARIZABLE:
                if (!roleManager.isLeader()) {
                    return CompletableFuture.failedFuture(
                        new NotLeaderException("Not the leader", roleManager.getLeaderHint()));
                }
                return roleManager.getLeaderRole().linearizableRead(key);

            case SEQUENTIAL:
                return roleManager.getFollowerRole().sequentialRead(key);

            case EVENTUAL:
            default:
                return roleManager.getFollowerRole().eventualRead(key);
        }
    }

    // 嘗試成為Leader
    public CompletableFuture<Boolean> electSelf() {
        return roleManager.electSelf();
    }

    // 日誌應用
    private void applyCommittedLogs() {
        try {
            long applied = stateMachine.getLastApplied();
            long toApply = log.getCommitIndex();

            if (applied >= toApply) {
                return; // 已全部應用
            }

            List<CompletableFuture<Void>> applyFutures = new ArrayList<>();

            // 應用從applied+1到toApply的所有日誌
            for (long i = applied + 1; i <= toApply; i++) {
                final long instanceId = i;
                LogEntry entry = log.getEntry(instanceId);

                if (entry != null && entry.isCommitted()) {
                    applyFutures.add(
                        stateMachine.apply(instanceId, entry.getCommand())
                            .thenRun(() -> {
                                logger.debug("Applied log entry at instance {} to state machine",
                                           instanceId);
                            })
                            .exceptionally(e -> {
                                logger.error("Failed to apply log at instance {}", instanceId, e);
                                return null;
                            })
                    );
                }
            }

            // 等待所有應用完成
            CompletableFuture.allOf(applyFutures.toArray(new CompletableFuture[0]))
                .thenRun(() -> {
                    // 日誌壓縮
                    if (toApply - applied > 1000) { // 如果應用了大量日誌,考慮壓縮
                        log.compactLogs(stateMachine.getLastApplied());
                    }
                })
                .exceptionally(e -> {
                    logger.error("Error during log application", e);
                    return null;
                });
        } catch (Exception e) {
            logger.error("Error applying committed logs", e);
        }
    }

    // 檢查Leader租約
    private void checkLeaderLease() {
        if (roleManager.isLeader()) {
            roleManager.getLeaderRole().checkLease();
        }
    }

    // 關閉系統
    public void shutdown() {
        try {
            List<Runnable> pendingTasks = scheduler.shutdownNow();
            if (!pendingTasks.isEmpty()) {
                logger.warn("Scheduler shutdown with {} pending tasks", pendingTasks.size());
            }

            if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
                logger.warn("Scheduler did not terminate in time");
            }

            networking.close();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.warn("Interrupted while waiting for scheduler termination");
        }
    }

    // 角色管理
    public class RoleManager {
        private final MultiPaxosSystem system;
        private final AtomicBoolean isLeader = new AtomicBoolean(false);
        private final AtomicInteger currentBallot = new AtomicInteger(0);
        private volatile int leaderNodeId = -1; // -1表示未知

        private final LeaderRole leaderRole;
        private final FollowerRole followerRole;

        public RoleManager(MultiPaxosSystem system) {
            this.system = system;
            this.leaderRole = new LeaderRole(system);
            this.followerRole = new FollowerRole(system);
        }

        public boolean isLeader() {
            return isLeader.get();
        }

        public int getLeaderHint() {
            return leaderNodeId;
        }

        public LeaderRole getLeaderRole() {
            return leaderRole;
        }

        public FollowerRole getFollowerRole() {
            return followerRole;
        }

        public int getCurrentBallot() {
            return currentBallot.get();
        }

        public void setCurrentBallot(int ballot) {
            currentBallot.set(ballot);
        }

        public CompletableFuture<Boolean> electSelf() {
            return leaderRole.electSelf().thenApply(elected -> {
                if (elected) {
                    isLeader.set(true);
                    leaderNodeId = nodeId;
                }
                return elected;
            });
        }

        public void stepDown() {
            if (isLeader.compareAndSet(true, false)) {
                logger.info("Node {} stepping down from leader", nodeId);
            }
        }

        public void recognizeLeader(int leaderId, int ballot) {
            leaderNodeId = leaderId;
            currentBallot.set(ballot);
            if (leaderId != nodeId) {
                isLeader.set(false);
            }
        }
    }

    // Leader角色實現
    public class LeaderRole {
        private final MultiPaxosSystem system;
        private final AtomicLong leaseExpirationTime = new AtomicLong(0);
        private final long leaderLeaseMs = 5000; // 5秒租約

        public LeaderRole(MultiPaxosSystem system) {
            this.system = system;
        }

        // Leader選舉
        public CompletableFuture<Boolean> electSelf() {
            MDC.put("component", "multi-paxos-leader");
            MDC.put("nodeId", String.valueOf(nodeId));
            logger.info("Node {} attempting to become leader", nodeId);

            try {
                int newBallot = generateNewBallot();
                MDC.put("ballot", String.valueOf(newBallot));

                return CompletableFuture.supplyAsync(() -> {
                    try {
                        // 執行Prepare階段
                        Map<Long, PrepareResponse> responseMap = networking.sendPrepareForAllInstances(newBallot)
                            .get(10, TimeUnit.SECONDS);

                        // 檢查是否獲得多數派支持
                        if (hasQuorumPromises(responseMap)) {
                            // 根據收集到的信息,更新本地日誌
                            updateLogFromPromises(responseMap);

                            // 成為Leader
                            system.roleManager.setCurrentBallot(newBallot);
                            system.roleManager.recognizeLeader(nodeId, newBallot);

                            logger.info("Node {} became leader with ballot {}", nodeId, newBallot);
                            renewLease();

                            // 執行接受階段,確保之前的日誌得到多數派接受
                            confirmPendingLogs();

                            return true;
                        } else {
                            logger.info("Failed to become leader - did not get quorum promises");
                            return false;
                        }
                    } catch (Exception e) {
                        logger.error("Error in become leader process", e);
                        return false;
                    } finally {
                        MDC.remove("component");
                        MDC.remove("nodeId");
                        MDC.remove("ballot");
                    }
                });
            } catch (Exception e) {
                logger.error("Error initiating leader election", e);
                MDC.remove("component");
                MDC.remove("nodeId");
                return CompletableFuture.failedFuture(e);
            }
        }

        // Leader: 追加新日誌
        public CompletableFuture<Boolean> appendLog(byte[] command) {
            Stopwatch stopwatch = Stopwatch.createStarted();
            MDC.put("component", "multi-paxos-leader");
            MDC.put("nodeId", String.valueOf(nodeId));

            if (!system.roleManager.isLeader()) {
                MDC.remove("component");
                MDC.remove("nodeId");
                return CompletableFuture.failedFuture(
                    new NotLeaderException("Node is not the leader", system.roleManager.getLeaderHint()));
            }

            try {
                long nextInstance = system.log.getNextInstanceId();
                MDC.put("instanceId", String.valueOf(nextInstance));
                logger.info("Leader {} appending log at instance {}", nodeId, nextInstance);

                // 創建日誌條目
                int currentBallot = system.roleManager.getCurrentBallot();
                LogEntry entry = new LogEntry(currentBallot, command.clone()); // 防禦性複製

                // 存儲日誌條目
                system.log.setEntry(nextInstance, entry);

                // 對於已有Leader,可以跳過Prepare階段,直接進入Accept階段
                return CompletableFuture.supplyAsync(() -> {
                    try {
                        List<AcceptResponse> responses = networking.sendAcceptRequests(
                            nextInstance, currentBallot, command)
                            .get(5, TimeUnit.SECONDS);

                        // 如果多數派接受
                        int quorum = getQuorum();
                        if (countAccepts(responses) >= quorum) {
                            // 提交日誌
                            entry.setCommitted(true);
                            system.log.updateCommitIndex(nextInstance);

                            // 通知所有節點提交
                            networking.sendCommitNotifications(nextInstance, currentBallot);

                            logger.info("Log entry at instance {} has been committed", nextInstance);
                            return true;
                        } else {
                            logger.warn("Failed to get quorum for instance {}", nextInstance);
                            // 可能失去了領導權,嘗試重新選舉
                            system.roleManager.stepDown();
                            return false;
                        }
                    } catch (Exception e) {
                        logger.error("Error in append log", e);
                        throw new CompletionException(e);
                    } finally {
                        MDC.remove("component");
                        MDC.remove("nodeId");
                        MDC.remove("instanceId");
                    }
                });
            } catch (Exception e) {
                logger.error("Error initiating append log", e);
                MDC.remove("component");
                MDC.remove("nodeId");
                return CompletableFuture.failedFuture(e);
            }
        }

        // 線性一致性讀取(通過Leader確認)
        public CompletableFuture<byte[]> linearizableRead(String key) {
            if (!system.roleManager.isLeader()) {
                return CompletableFuture.failedFuture(
                    new NotLeaderException("Not the leader", system.roleManager.getLeaderHint()));
            }

            // 檢查租約
            if (System.currentTimeMillis() >= leaseExpirationTime.get()) {
                // 租約過期,需要重新確認Leader身份
                return renewLease().thenCompose(renewed -> {
                    if (!renewed) {
                        return CompletableFuture.failedFuture(
                            new ConsistencyException("Could not renew leadership lease"));
                    }
                    return system.stateMachine.read(key);
                });
            }

            // 租約有效,直接讀取
            return system.stateMachine.read(key);
        }

        // 更新Leader租約
        private CompletableFuture<Boolean> renewLease() {
            if (!system.roleManager.isLeader()) {
                return CompletableFuture.completedFuture(false);
            }

            return CompletableFuture.supplyAsync(() -> {
                try {
                    // 向多數派發送心跳以確認仍是Leader
                    int currentBallot = system.roleManager.getCurrentBallot();
                    int responses = networking.sendLeadershipHeartbeats(currentBallot)
                        .get(3, TimeUnit.SECONDS);

                    if (responses >= getQuorum()) {
                        leaseExpirationTime.set(System.currentTimeMillis() + leaderLeaseMs);
                        logger.debug("Renewed leader lease until {}", leaseExpirationTime.get());
                        return true;
                    } else {
                        logger.warn("Failed to renew leadership lease");
                        return false;
                    }
                } catch (Exception e) {
                    logger.error("Error renewing leadership lease", e);
                    return false;
                }
            });
        }

        // 檢查租約狀態
        public void checkLease() {
            if (!system.roleManager.isLeader()) {
                return;
            }

            // 如果租約即將過期,嘗試續期
            long now = System.currentTimeMillis();
            long expiration = leaseExpirationTime.get();

            // 如果租約將在1秒內過期,提前續期
            if (now + 1000 > expiration) {
                renewLease().thenAccept(renewed -> {
                    if (!renewed) {
                        logger.warn("Lease renewal failed, stepping down as leader");
                        system.roleManager.stepDown();
                    }
                });
            }
        }

        // 確保之前的日誌條目被多數派接受
        private void confirmPendingLogs() {
            // 實現邏輯...
        }

        // 根據prepare響應更新日誌
        private void updateLogFromPromises(Map<Long, PrepareResponse> responseMap) {
            // 實現邏輯...
        }

        // 檢查是否獲得多數派promise
        private boolean hasQuorumPromises(Map<Long, PrepareResponse> responseMap) {
            // 實現邏輯...
            return true; // 簡化
        }

        // 統計accept響應
        private int countAccepts(List<AcceptResponse> responses) {
            return (int) responses.stream()
                .filter(r -> r != null && r.isAccepted())
                .count();
        }
    }

    // Follower角色實現
    public class FollowerRole {
        private final MultiPaxosSystem system;
        private final Map<String, CacheEntry> readCache = new ConcurrentHashMap<>();
        private final long maxCacheAgeMs = 5000; // 5秒緩存過期

        public FollowerRole(MultiPaxosSystem system) {
            this.system = system;
        }

        // 處理心跳消息
        public void handleHeartbeat(int leaderBallot, int leaderNodeId, long leaderCommitIndex) {
            // 更新本地commit index
            system.log.updateCommitIndex(leaderCommitIndex);

            // 如果自己認為自己是Leader但收到更高ballot的心跳,則退位
            if (system.roleManager.isLeader() && leaderBallot > system.roleManager.getCurrentBallot()) {
                logger.info("Stepping down as leader due to heartbeat with higher ballot: {}",
                           leaderBallot);
                system.roleManager.stepDown();
            }

            // 記錄當前Leader
            system.roleManager.recognizeLeader(leaderNodeId, leaderBallot);
        }

        // 順序一致性讀(確保看到所有之前的寫入)
        public CompletableFuture<byte[]> sequentialRead(String key) {
            // 確保應用了所有已提交的事務
            return ensureAppliedUpToCommitIndex()
                .thenCompose(v -> system.stateMachine.read(key));
        }

        // 最終一致性讀(直接從本地讀取)
        public CompletableFuture<byte[]> eventualRead(String key) {
            return system.stateMachine.read(key);
        }

        // 確保應用到當前commitIndex
        private CompletableFuture<Void> ensureAppliedUpToCommitIndex() {
            long current = system.log.getCommitIndex();
            long applied = system.stateMachine.getLastApplied();

            if (applied >= current) {
                return CompletableFuture.completedFuture(null); // 已全部應用
            }

            // 等待應用完成
            CompletableFuture<Void> result = new CompletableFuture<>();
            scheduler.execute(() -> {
                try {
                    // 觸發應用
                    system.applyCommittedLogs();

                    // 檢查是否應用完成
                    if (system.stateMachine.getLastApplied() >= current) {
                        result.complete(null);
                    } else {
                        // 可能有一些延遲,再次檢查
                        scheduler.schedule(() -> {
                            system.applyCommittedLogs();
                            result.complete(null);
                        }, 50, TimeUnit.MILLISECONDS);
                    }
                } catch (Exception e) {
                    result.completeExceptionally(e);
                }
            });

            return result;
        }

        // 清理過期緩存
        public void cleanupReadCache() {
            long now = System.currentTimeMillis();
            // 移除過期條目
            readCache.entrySet().removeIf(entry ->
                now - entry.getValue().getTimestamp() > maxCacheAgeMs);

            // 如果緩存過大,移除最舊的條目
            if (readCache.size() > config.getMaxCacheSize()) {
                List<String> oldestKeys = readCache.entrySet().stream()
                    .sorted(Comparator.comparingLong(e -> e.getValue().getTimestamp()))
                    .limit(readCache.size() - config.getMaxCacheSize())
                    .map(Map.Entry::getKey)
                    .collect(Collectors.toList());

                for (String key : oldestKeys) {
                    readCache.remove(key);
                }
                logger.info("Cache cleanup: removed {} old entries", oldestKeys.size());
            }
        }
    }

    // 內部組件類

    // 日誌管理
    public static class MultiPaxosLog {
        private final ReadWriteLock logLock = new ReentrantReadWriteLock();
        private final ConcurrentNavigableMap<Long, LogEntry> log = new ConcurrentSkipListMap<>();
        private final AtomicLong nextInstanceId = new AtomicLong(1);
        private final AtomicLong commitIndex = new AtomicLong(0);
        private final Logger logger = LoggerFactory.getLogger(MultiPaxosLog.class);

        public LogEntry getEntry(long index) {
            logLock.readLock().lock();
            try {
                return log.get(index);
            } finally {
                logLock.readLock().unlock();
            }
        }

        public void setEntry(long index, LogEntry entry) {
            logLock.writeLock().lock();
            try {
                log.put(index, entry);
                nextInstanceId.updateAndGet(current -> Math.max(current, index + 1));
            } finally {
                logLock.writeLock().unlock();
            }
        }

        public long getNextInstanceId() {
            return nextInstanceId.getAndIncrement();
        }

        public long getCommitIndex() {
            return commitIndex.get();
        }

        public void updateCommitIndex(long newCommitIndex) {
            // 原子更新提交索引,確保只增不減
            commitIndex.updateAndGet(current -> Math.max(current, newCommitIndex));
        }

        // 日誌壓縮
        public void compactLogs(long appliedIndex) {
            // 保留最近的日誌,刪除舊日誌
            final int retentionWindow = 1000; // 保留最近1000條
            long truncatePoint = appliedIndex - retentionWindow;

            if (truncatePoint <= 0) {
                return; // 不需要壓縮
            }

            logLock.writeLock().lock();
            try {
                List<Long> toRemove = log.keySet().stream()
                    .filter(idx -> idx < truncatePoint)
                    .collect(Collectors.toList());

                for (Long idx : toRemove) {
                    log.remove(idx);
                }

                logger.info("Compacted {} log entries before index {}",
                           toRemove.size(), truncatePoint);
            } finally {
                logLock.writeLock().unlock();
            }
        }
    }

    // 狀態機實現
    public static class MultiPaxosStateMachine {
        private final AtomicLong lastApplied = new AtomicLong(0);
        private final Map<String, byte[]> keyValueStore = new ConcurrentHashMap<>();
        private final Logger logger = LoggerFactory.getLogger(MultiPaxosStateMachine.class);

        public CompletableFuture<Void> apply(long instanceId, byte[] command) {
            return CompletableFuture.runAsync(() -> {
                try {
                    // 解析命令
                    Command cmd = deserializeCommand(command);

                    // 應用到狀態機
                    if (cmd.getType() == CommandType.PUT) {
                        keyValueStore.put(cmd.getKey(), cmd.getValue());
                    } else if (cmd.getType() == CommandType.DELETE) {
                        keyValueStore.remove(cmd.getKey());
                    }

                    // 更新已應用索引
                    lastApplied.updateAndGet(current -> Math.max(current, instanceId));
                } catch (Exception e) {
                    logger.error("Error applying command at instance {}", instanceId, e);
                    throw new CompletionException(e);
                }
            });
        }

        public CompletableFuture<byte[]> read(String key) {
            return CompletableFuture.supplyAsync(() -> {
                byte[] value = keyValueStore.get(key);
                return value != null ? value.clone() : null; // 防禦性複製
            });
        }

        public long getLastApplied() {
            return lastApplied.get();
        }

        public CompletableFuture<byte[]> takeSnapshot() {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    // 創建狀態機快照
                    return serializeState();
                } catch (Exception e) {
                    logger.error("Error taking snapshot", e);
                    throw new CompletionException(e);
                }
            });
        }

        public CompletableFuture<Void> restoreSnapshot(byte[] snapshot, long instanceId) {
            return CompletableFuture.runAsync(() -> {
                try {
                    // 從快照恢復狀態
                    deserializeState(snapshot);

                    // 更新已應用索引
                    lastApplied.set(instanceId);
                } catch (Exception e) {
                    logger.error("Error restoring snapshot", e);
                    throw new CompletionException(e);
                }
            });
        }

        // 序列化和反序列化輔助方法
        private Command deserializeCommand(byte[] data) {
            // 實際實現應使用正式的序列化機制
            return new Command(CommandType.PUT, "key", data); // 簡化示例
        }

        private byte[] serializeState() {
            // 實際實現應使用正式的序列化機制
            return new byte[0]; // 簡化示例
        }

        private void deserializeState(byte[] data) {
            // 實際實現應使用正式的序列化機制
            // 簡化示例
        }
    }

    // 網絡層
    public static class MultiPaxosNetworking implements AutoCloseable {
        private final int nodeId;
        private final Map<Integer, NodeInfo> nodes;
        private final NetworkClient client;
        private final Logger logger = LoggerFactory.getLogger(MultiPaxosNetworking.class);

        public MultiPaxosNetworking(int nodeId, Map<Integer, NodeInfo> nodes) {
            this.nodeId = nodeId;
            this.nodes = new HashMap<>(nodes);
            this.client = createNetworkClient();
        }

        private NetworkClient createNetworkClient() {
            // 實際實現應創建合適的網絡客户端
            return new NetworkClientImpl();
        }

        public CompletableFuture<Map<Long, PrepareResponse>> sendPrepareForAllInstances(int ballot) {
            // 實現邏輯...
            return CompletableFuture.completedFuture(new HashMap<>());
        }

        public CompletableFuture<List<AcceptResponse>> sendAcceptRequests(
                long instanceId, int ballot, byte[] command) {
            // 實現邏輯...
            return CompletableFuture.completedFuture(new ArrayList<>());
        }

        public CompletableFuture<Integer> sendLeadershipHeartbeats(int ballot) {
            // 實現邏輯...
            return CompletableFuture.completedFuture(0);
        }

        public void sendCommitNotifications(long instanceId, int ballot) {
            // 實現邏輯...
        }

        @Override
        public void close() {
            // 關閉網絡客户端
        }
    }

    // 生成新的ballot,確保全局唯一性
    private int generateNewBallot() {
        // 確保新ballot大於之前的,並且不同節點生成的ballot唯一
        int currentBallot = roleManager.getCurrentBallot();
        return (currentBallot / config.getTotalNodes() + 1) * config.getTotalNodes() + nodeId;
    }

    // 獲取多數派數量
    private int getQuorum() {
        return config.getTotalNodes() / 2 + 1;
    }

    // 日誌條目
    public static class LogEntry {
        private int ballot;
        private final byte[] command;
        private volatile boolean committed;

        LogEntry(int ballot, byte[] command) {
            this.ballot = ballot;
            this.command = command.clone(); // 防禦性複製
            this.committed = false;
        }

        public int getBallot() {
            return ballot;
        }

        public void setBallot(int ballot) {
            this.ballot = ballot;
        }

        public byte[] getCommand() {
            return command.clone(); // 防禦性複製
        }

        public boolean isCommitted() {
            return committed;
        }

        public void setCommitted(boolean committed) {
            this.committed = committed;
        }
    }

    // 配置類
    public static class Configuration {
        private final int totalNodes;
        private final Map<Integer, NodeInfo> nodes;
        private final int maxCacheSize;

        public Configuration(int totalNodes, Map<Integer, NodeInfo> nodes, int maxCacheSize) {
            this.totalNodes = totalNodes;
            this.nodes = new HashMap<>(nodes);
            this.maxCacheSize = maxCacheSize;
        }

        public int getTotalNodes() {
            return totalNodes;
        }

        public Map<Integer, NodeInfo> getNodes() {
            return Collections.unmodifiableMap(nodes);
        }

        public int getMaxCacheSize() {
            return maxCacheSize;
        }
    }

    // 節點信息
    public static class NodeInfo {
        private final int id;
        private final String host;
        private final int port;

        public NodeInfo(int id, String host, int port) {
            this.id = id;
            this.host = host;
            this.port = port;
        }

        public int getId() {
            return id;
        }

        public String getHost() {
            return host;
        }

        public int getPort() {
            return port;
        }
    }

    // 命令類型
    enum CommandType {
        PUT, DELETE
    }

    // 命令對象
    static class Command {
        private final CommandType type;
        private final String key;
        private final byte[] value;

        public Command(CommandType type, String key, byte[] value) {
            this.type = type;
            this.key = key;
            this.value = value != null ? value.clone() : null; // 防禦性複製
        }

        public CommandType getType() {
            return type;
        }

        public String getKey() {
            return key;
        }

        public byte[] getValue() {
            return value != null ? value.clone() : null; // 防禦性複製
        }
    }

    // 響應類
    public static class PrepareResponse {
        // 實現...
    }

    public static class AcceptResponse {
        private final boolean accepted;

        public AcceptResponse(boolean accepted) {
            this.accepted = accepted;
        }

        public boolean isAccepted() {
            return accepted;
        }
    }

    // 一致性級別
    public enum ConsistencyLevel {
        LINEARIZABLE,    // 線性一致性
        SEQUENTIAL,      // 順序一致性
        EVENTUAL         // 最終一致性
    }

    // 異常類
    public static class NotLeaderException extends RuntimeException {
        private final int leaderHint;

        public NotLeaderException(String message, int leaderHint) {
            super(message);
            this.leaderHint = leaderHint;
        }

        public int getLeaderHint() {
            return leaderHint;
        }
    }

    public static class ConsistencyException extends RuntimeException {
        public ConsistencyException(String message) {
            super(message);
        }
    }

    // 簡化的網絡客户端實現
    private static class NetworkClientImpl implements NetworkClient {
        // 實現網絡接口...

        @Override
        public CompletableFuture<Promise> sendPrepare(int nodeId, int ballot) {
            return null;
        }

        @Override
        public CompletableFuture<Accepted> sendAccept(int nodeId, int ballot, Object value) {
            return null;
        }

        @Override
        public void sendLearn(int nodeId, long instanceId, int ballot, Object value) {

        }

        @Override
        public CompletableFuture<Map<Long, PrepareResponse>> sendPrepareAllInstances(int nodeId, int ballot) {
            return null;
        }

        @Override
        public CompletableFuture<Void> sendSnapshot(int nodeId, byte[] snapshot, long lastInstanceId) {
            return null;
        }
    }
}

四、網絡分區處理與成員變更

網絡分區檢測

public class PartitionHandler implements AutoCloseable {
    private final String nodeId;
    private final AtomicLong lastHeartbeatTime = new AtomicLong(0);
    private final AtomicBoolean suspectPartition = new AtomicBoolean(false);
    private final ScheduledExecutorService scheduler;
    private final long heartbeatTimeoutMs;
    private final Consumer<PartitionEvent> partitionCallback;
    private final Logger logger = LoggerFactory.getLogger(PartitionHandler.class);

    public PartitionHandler(String nodeId, long heartbeatTimeoutMs,
                          Consumer<PartitionEvent> partitionCallback) {
        this.nodeId = nodeId;
        this.heartbeatTimeoutMs = heartbeatTimeoutMs;
        this.partitionCallback = partitionCallback;
        this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
            Thread t = new Thread(r, "partition-detector-" + nodeId);
            t.setDaemon(true);
            return t;
        });

        // 啓動心跳檢測任務
        scheduler.scheduleAtFixedRate(
            this::checkHeartbeat,
            heartbeatTimeoutMs / 2,
            heartbeatTimeoutMs / 2,
            TimeUnit.MILLISECONDS
        );
    }

    // 記錄收到心跳
    public void recordHeartbeat() {
        lastHeartbeatTime.set(System.currentTimeMillis());
        if (suspectPartition.compareAndSet(true, false)) {
            logger.info("Node {} no longer suspects network partition", nodeId);
            partitionCallback.accept(new PartitionEvent(PartitionStatus.RECOVERED, nodeId));
        }
    }

    // 檢查心跳超時
    private void checkHeartbeat() {
        try {
            long now = System.currentTimeMillis();
            long last = lastHeartbeatTime.get();

            if (last > 0 && now - last > heartbeatTimeoutMs) {
                // 可能存在網絡分區
                if (suspectPartition.compareAndSet(false, true)) {
                    logger.warn("Node {} suspects network partition, last heartbeat: {}ms ago",
                               nodeId, now - last);

                    // 執行分區檢測回調
                    partitionCallback.accept(new PartitionEvent(PartitionStatus.SUSPECTED, nodeId));
                }
            }
        } catch (Exception e) {
            logger.error("Error checking heartbeat", e);
        }
    }

    @Override
    public void close() {
        scheduler.shutdownNow();
        try {
            if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
                logger.warn("Partition detector scheduler did not terminate in time");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.warn("Interrupted while shutting down partition detector");
        }
    }

    // 分區狀態枚舉
    public enum PartitionStatus {
        SUSPECTED,  // 懷疑發生分區
        CONFIRMED,  // 確認發生分區
        RECOVERED   // 分區已恢復
    }

    // 分區事件類
    public static class PartitionEvent {
        private final PartitionStatus status;
        private final String nodeId;

        public PartitionEvent(PartitionStatus status, String nodeId) {
            this.status = status;
            this.nodeId = nodeId;
        }

        public PartitionStatus getStatus() {
            return status;
        }

        public String getNodeId() {
            return nodeId;
        }
    }
}

成員變更實現

public class MembershipManager implements AutoCloseable {
    private final ConcurrentMap<String, ServerInfo> servers = new ConcurrentHashMap<>();
    private volatile Configuration currentConfig;
    private final AtomicLong configVersion = new AtomicLong(0);
    private final String nodeId;
    private final AtomicBoolean isLeader = new AtomicBoolean(false);
    private final Logger logger = LoggerFactory.getLogger(MembershipManager.class);
    private final ConfigurationStore configStore;
    private final NetworkClient networkClient;
    private final StampedLock configLock = new StampedLock();

    public MembershipManager(String nodeId, boolean isLeader,
                           ConfigurationStore configStore,
                           NetworkClient networkClient) {
        this.nodeId = nodeId;
        this.isLeader.set(isLeader);
        this.configStore = configStore;
        this.networkClient = networkClient;

        // 初始化配置
        try {
            this.currentConfig = configStore.loadConfiguration();
            if (this.currentConfig == null) {
                this.currentConfig = new Configuration(configVersion.get(), new HashMap<>());
            }
            servers.putAll(currentConfig.getServers());
        } catch (IOException e) {
            logger.error("Failed to load configuration", e);
            this.currentConfig = new Configuration(configVersion.get(), new HashMap<>());
        }
    }

    // 兩階段成員變更 - 安全添加節點
    public CompletableFuture<Boolean> addServer(String serverId, String address, int port) {
        MDC.put("component", "membership-manager");
        MDC.put("nodeId", nodeId);
        MDC.put("targetServerId", serverId);

        if (!isLeader.get()) {
            logger.warn("Only leader can change membership");
            MDC.remove("component");
            MDC.remove("nodeId");
            MDC.remove("targetServerId");
            return CompletableFuture.failedFuture(
                new IllegalStateException("Only leader can change membership"));
        }

        CompletableFuture<Boolean> result = new CompletableFuture<>();

        CompletableFuture.runAsync(() -> {
            long stamp = configLock.writeLock();
            try {
                logger.info("Starting server addition: {}", serverId);

                // 第一階段:創建過渡配置(包含新舊所有節點)
                Configuration oldConfig = currentConfig;
                Configuration jointConfig = createJointConfig(oldConfig, serverId, address, port);

                // 將過渡配置提交給集羣
                commitConfiguration(jointConfig).thenAccept(committed -> {
                    if (!committed) {
                        logger.warn("Failed to commit joint configuration for server {}", serverId);
                        result.complete(false);
                        return;
                    }

                    logger.info("Joint configuration committed, proceeding to second phase");

                    // 第二階段:創建新配置(確認包含新節點)
                    Configuration newConfig = createNewConfig(jointConfig);

                    // 將新配置提交給集羣
                    commitConfiguration(newConfig).thenAccept(finalCommitted -> {
                        if (finalCommitted) {
                            logger.info("Server {} successfully added to cluster", serverId);
                        } else {
                            logger.warn("Failed to commit final configuration for server {}", serverId);
                        }
                        result.complete(finalCommitted);
                    }).exceptionally(e -> {
                        logger.error("Error committing final configuration for server {}",
                                    serverId, e);
                        result.completeExceptionally(e);
                        return null;
                    });
                }).exceptionally(e -> {
                    logger.error("Error committing joint configuration for server {}",
                                serverId, e);
                    result.completeExceptionally(e);
                    return null;
                });
            } catch (Exception e) {
                logger.error("Error adding server {}", serverId, e);
                result.completeExceptionally(e);
            } finally {
                configLock.unlockWrite(stamp);
                MDC.remove("component");
                MDC.remove("nodeId");
                MDC.remove("targetServerId");
            }
        });

        return result;
    }

    // 兩階段成員變更 - 安全移除節點
    public CompletableFuture<Boolean> removeServer(String serverId) {
        MDC.put("component", "membership-manager");
        MDC.put("nodeId", nodeId);
        MDC.put("targetServerId", serverId);

        if (!isLeader.get()) {
            logger.warn("Only leader can change membership");
            MDC.remove("component");
            MDC.remove("nodeId");
            MDC.remove("targetServerId");
            return CompletableFuture.failedFuture(
                new IllegalStateException("Only leader can change membership"));
        }

        if (!servers.containsKey(serverId)) {
            logger.warn("Server {} not found in configuration", serverId);
            MDC.remove("component");
            MDC.remove("nodeId");
            MDC.remove("targetServerId");
            return CompletableFuture.completedFuture(false);
        }

        CompletableFuture<Boolean> result = new CompletableFuture<>();

        CompletableFuture.runAsync(() -> {
            long stamp = configLock.writeLock();
            try {
                logger.info("Starting server removal: {}", serverId);

                // 第一階段:創建過渡配置(標記要移除的節點)
                Configuration oldConfig = currentConfig;
                Configuration jointConfig = createJointConfig(oldConfig, serverId);

                // 將過渡配置提交給集羣
                commitConfiguration(jointConfig).thenAccept(committed -> {
                    if (!committed) {
                        logger.warn("Failed to commit joint configuration for removing server {}",
                                   serverId);
                        result.complete(false);
                        return;
                    }

                    logger.info("Joint configuration committed, proceeding to second phase");

                    // 第二階段:創建新配置(移除目標節點)
                    Configuration newConfig = createNewConfigWithout(jointConfig, serverId);

                    // 將新配置提交給集羣
                    commitConfiguration(newConfig).thenAccept(finalCommitted -> {
                        if (finalCommitted) {
                            logger.info("Server {} successfully removed from cluster", serverId);
                        } else {
                            logger.warn("Failed to commit final configuration for removing server {}",
                                       serverId);
                        }
                        result.complete(finalCommitted);
                    }).exceptionally(e -> {
                        logger.error("Error committing final configuration for removing server {}",
                                    serverId, e);
                        result.completeExceptionally(e);
                        return null;
                    });
                }).exceptionally(e -> {
                    logger.error("Error committing joint configuration for removing server {}",
                                serverId, e);
                    result.completeExceptionally(e);
                    return null;
                });
            } catch (Exception e) {
                logger.error("Error removing server {}", serverId, e);
                result.completeExceptionally(e);
            } finally {
                configLock.unlockWrite(stamp);
                MDC.remove("component");
                MDC.remove("nodeId");
                MDC.remove("targetServerId");
            }
        });

        return result;
    }

    // 創建過渡配置(添加節點)
    private Configuration createJointConfig(Configuration oldConfig,
                                          String newServerId, String address, int port) {
        Map<String, ServerInfo> newServers = new HashMap<>(oldConfig.getServers());
        newServers.put(newServerId, new ServerInfo(newServerId, address, port));

        return new Configuration(configVersion.incrementAndGet(), newServers);
    }

    // 創建過渡配置(刪除節點)
    private Configuration createJointConfig(Configuration oldConfig, String serverId) {
        // 標記要刪除的節點(在過渡配置中仍存在,但標記為待移除)
        Map<String, ServerInfo> jointServers = new HashMap<>(oldConfig.getServers());
        ServerInfo serverInfo = jointServers.get(serverId);
        if (serverInfo != null) {
            ServerInfo markedServer = new ServerInfo(
                serverId, serverInfo.getAddress(), serverInfo.getPort(), true);
            jointServers.put(serverId, markedServer);
        }

        return new Configuration(configVersion.incrementAndGet(), jointServers);
    }

    // 創建新配置(確認添加節點)
    private Configuration createNewConfig(Configuration jointConfig) {
        // 最終配置,清除所有標記
        Map<String, ServerInfo> newServers = new HashMap<>();
        for (var entry : jointConfig.getServers().entrySet()) {
            if (!entry.getValue().isMarkedForRemoval()) {
                newServers.put(entry.getKey(), new ServerInfo(
                    entry.getValue().getId(),
                    entry.getValue().getAddress(),
                    entry.getValue().getPort(),
                    false
                ));
            }
        }

        return new Configuration(configVersion.incrementAndGet(), newServers);
    }

    // 創建新配置(確認刪除節點)
    private Configuration createNewConfigWithout(Configuration jointConfig, String serverId) {
        Map<String, ServerInfo> newServers = new HashMap<>();
        for (var entry : jointConfig.getServers().entrySet()) {
            if (!entry.getKey().equals(serverId) && !entry.getValue().isMarkedForRemoval()) {
                newServers.put(entry.getKey(), new ServerInfo(
                    entry.getValue().getId(),
                    entry.getValue().getAddress(),
                    entry.getValue().getPort(),
                    false
                ));
            }
        }

        return new Configuration(configVersion.incrementAndGet(), newServers);
    }

    // 提交配置變更
    private CompletableFuture<Boolean> commitConfiguration(Configuration config) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                // 實際實現會通過共識算法提交配置變更
                logger.info("Committing configuration version {}", config.getVersion());

                // 持久化配置
                configStore.saveConfiguration(config);

                // 更新本地配置
                synchronized (this) {
                    currentConfig = config;
                    servers.clear();
                    servers.putAll(config.getServers());
                }

                // 廣播配置變更
                broadcastConfigChange(config);

                return true;
            } catch (Exception e) {
                logger.error("Error committing configuration", e);
                return false;
            }
        });
    }

    // 廣播配置變更
    private void broadcastConfigChange(Configuration config) {
        // 向所有節點廣播配置變更
        for (String serverId : servers.keySet()) {
            if (!serverId.equals(nodeId)) {
                CompletableFuture.runAsync(() -> {
                    try {
                        // 實際實現中調用網絡客户端發送配置
                        notifyConfigChange(serverId, config);
                    } catch (Exception e) {
                        logger.error("Failed to notify server {} of config change", serverId, e);
                    }
                });
            }
        }
    }

    // 通知節點配置變更
    private void notifyConfigChange(String serverId, Configuration config) {
        // 實際實現會發送配置給指定節點
        logger.debug("Notifying server {} of configuration change to version {}",
                   serverId, config.getVersion());
    }

    // 處理接收到的配置變更
    public void handleConfigChange(Configuration newConfig) {
        long stamp = configLock.writeLock();
        try {
            if (newConfig.getVersion() > currentConfig.getVersion()) {
                try {
                    // 持久化新配置
                    configStore.saveConfiguration(newConfig);

                    // 更新本地配置
                    currentConfig = newConfig;
                    servers.clear();
                    servers.putAll(newConfig.getServers());

                    logger.info("Updated to new configuration version {}", newConfig.getVersion());
                } catch (IOException e) {
                    logger.error("Failed to persist new configuration", e);
                }
            } else {
                logger.debug("Ignoring old configuration version {} (current is {})",
                           newConfig.getVersion(), currentConfig.getVersion());
            }
        } finally {
            configLock.unlockWrite(stamp);
        }
    }

    // 獲取當前配置
    public Configuration getCurrentConfig() {
        long stamp = configLock.tryOptimisticRead();
        Configuration config = currentConfig;

        if (!configLock.validate(stamp)) {
            stamp = configLock.readLock();
            try {
                config = currentConfig;
            } finally {
                configLock.unlockRead(stamp);
            }
        }

        return config;
    }

    // 檢查節點是否在配置中(不包括標記為移除的節點)
    public boolean isServerInConfig(String serverId) {
        ServerInfo info = servers.get(serverId);
        return info != null && !info.isMarkedForRemoval();
    }

    // 獲取有效服務器數量(不包括標記為移除的節點)
    public int getActiveServerCount() {
        return (int) servers.values().stream()
            .filter(s -> !s.isMarkedForRemoval())
            .count();
    }

    // 設置Leader狀態
    public void setLeader(boolean isLeader) {
        this.isLeader.set(isLeader);
    }

    @Override
    public void close() {
        // 釋放資源
    }

    // 配置類
    public static class Configuration implements Serializable {
        private static final long serialVersionUID = 1L;

        private final long version;
        private final Map<String, ServerInfo> servers;

        public Configuration(long version, Map<String, ServerInfo> servers) {
            this.version = version;
            this.servers = new HashMap<>(servers);
        }

        public long getVersion() {
            return version;
        }

        public Map<String, ServerInfo> getServers() {
            return Collections.unmodifiableMap(servers);
        }
    }

    // 服務器信息
    public static class ServerInfo implements Serializable {
        private static final long serialVersionUID = 1L;

        private final String id;
        private final String address;
        private final int port;
        private final boolean markedForRemoval;

        public ServerInfo(String id, String address, int port) {
            this(id, address, port, false);
        }

        public ServerInfo(String id, String address, int port, boolean markedForRemoval) {
            this.id = id;
            this.address = address;
            this.port = port;
            this.markedForRemoval = markedForRemoval;
        }

        public String getId() {
            return id;
        }

        public String getAddress() {
            return address;
        }

        public int getPort() {
            return port;
        }

        public boolean isMarkedForRemoval() {
            return markedForRemoval;
        }
    }
}

配置存儲實現

public class FileBasedConfigurationStore implements ConfigurationStore {
    private final Path configPath;
    private final Path snapshotDir;
    private final Logger logger = LoggerFactory.getLogger(FileBasedConfigurationStore.class);

    public FileBasedConfigurationStore(Path configPath, Path snapshotDir) {
        this.configPath = configPath;
        this.snapshotDir = snapshotDir;

        try {
            Files.createDirectories(configPath.getParent());
            Files.createDirectories(snapshotDir);
        } catch (IOException e) {
            logger.error("Failed to create directories", e);
            throw new UncheckedIOException("Failed to create directories", e);
        }
    }

    @Override
    public void saveConfiguration(MembershipManager.Configuration config) throws IOException {
        // 使用原子寫入保證一致性
        Path tempPath = configPath.resolveSibling(configPath.getFileName() + ".tmp");
        try (ObjectOutputStream oos = new ObjectOutputStream(
                new BufferedOutputStream(Files.newOutputStream(tempPath)))) {
            oos.writeObject(config);
            oos.flush();
            Files.move(tempPath, configPath, StandardCopyOption.ATOMIC_MOVE,
                      StandardCopyOption.REPLACE_EXISTING);

            logger.info("Configuration version {} saved successfully", config.getVersion());
        } catch (IOException e) {
            logger.error("Failed to save configuration", e);
            throw e;
        }
    }

    @Override
    public MembershipManager.Configuration loadConfiguration() throws IOException {
        if (!Files.exists(configPath)) {
            logger.info("Configuration file does not exist: {}", configPath);
            return null;
        }

        try (ObjectInputStream ois = new ObjectInputStream(
                new BufferedInputStream(Files.newInputStream(configPath)))) {
            MembershipManager.Configuration config =
                (MembershipManager.Configuration) ois.readObject();
            logger.info("Loaded configuration version {}", config.getVersion());
            return config;
        } catch (ClassNotFoundException e) {
            logger.error("Failed to deserialize configuration", e);
            throw new IOException("Failed to deserialize configuration", e);
        }
    }

    @Override
    public void saveSnapshot(long index, byte[] data) throws IOException {
        // 創建快照文件名,包含索引
        String snapshotFileName = String.format("snapshot-%020d.bin", index);
        Path snapshotPath = snapshotDir.resolve(snapshotFileName);
        Path tempPath = snapshotDir.resolve(snapshotFileName + ".tmp");

        try {
            // 寫入臨時文件
            Files.write(tempPath, data);

            // 原子移動
            Files.move(tempPath, snapshotPath, StandardCopyOption.ATOMIC_MOVE,
                      StandardCopyOption.REPLACE_EXISTING);

            logger.info("Snapshot at index {} saved successfully, size: {} bytes",
                       index, data.length);

            // 清理舊快照,保留最近的5個
            cleanupOldSnapshots(5);
        } catch (IOException e) {
            logger.error("Failed to save snapshot at index {}", index, e);
            throw e;
        }
    }

    @Override
    public SnapshotInfo loadLatestSnapshot() throws IOException {
        try {
            // 查找最新的快照文件
            Optional<Path> latestSnapshot = Files.list(snapshotDir)
                .filter(p -> p.getFileName().toString().startsWith("snapshot-") &&
                           p.getFileName().toString().endsWith(".bin"))
                .max(Comparator.comparing(p -> p.getFileName().toString()));

            if (latestSnapshot.isPresent()) {
                Path snapshotPath = latestSnapshot.get();
                String fileName = snapshotPath.getFileName().toString();

                // 從文件名中提取索引
                long index = Long.parseLong(fileName.substring(9, 29));

                // 讀取快照數據
                byte[] data = Files.readAllBytes(snapshotPath);

                logger.info("Loaded snapshot at index {}, size: {} bytes", index, data.length);
                return new SnapshotInfo(index, data);
            } else {
                logger.info("No snapshot found in directory: {}", snapshotDir);
                return null;
            }
        } catch (IOException e) {
            logger.error("Failed to load latest snapshot", e);
            throw e;
        }
    }

    // 清理舊快照,只保留最新的n個
    private void cleanupOldSnapshots(int keepCount) throws IOException {
        try {
            List<Path> snapshots = Files.list(snapshotDir)
                .filter(p -> p.getFileName().toString().startsWith("snapshot-") &&
                           p.getFileName().toString().endsWith(".bin"))
                .sorted(Comparator.comparing(p -> p.getFileName().toString()))
                .collect(Collectors.toList());

            // 如果快照數量超過保留數量,刪除舊的
            if (snapshots.size() > keepCount) {
                int toDelete = snapshots.size() - keepCount;
                for (int i = 0; i < toDelete; i++) {
                    Files.delete(snapshots.get(i));
                    logger.info("Deleted old snapshot: {}", snapshots.get(i).getFileName());
                }
            }
        } catch (IOException e) {
            logger.error("Failed to cleanup old snapshots", e);
            throw e;
        }
    }

    // 快照信息類
    public static class SnapshotInfo {
        private final long index;
        private final byte[] data;

        public SnapshotInfo(long index, byte[] data) {
            this.index = index;
            this.data = data.clone(); // 防禦性複製
        }

        public long getIndex() {
            return index;
        }

        public byte[] getData() {
            return data.clone(); // 防禦性複製
        }
    }
}

// 配置存儲接口
public interface ConfigurationStore {
    void saveConfiguration(MembershipManager.Configuration config) throws IOException;
    MembershipManager.Configuration loadConfiguration() throws IOException;
    void saveSnapshot(long index, byte[] data) throws IOException;
    SnapshotInfo loadLatestSnapshot() throws IOException;

    // 快照信息內部類定義
    class SnapshotInfo {
        private final long index;
        private final byte[] data;

        public SnapshotInfo(long index, byte[] data) {
            this.index = index;
            this.data = data.clone();
        }

        public long getIndex() {
            return index;
        }

        public byte[] getData() {
            return data.clone();
        }
    }
}

跨數據中心複製支持

public class CrossDCReplication implements AutoCloseable {
    private final String localDC;
    private final List<String> allDCs;
    private final Map<String, DCConnection> dcConnections;
    private final ConsensusSystem localSystem;
    private final Logger logger = LoggerFactory.getLogger(CrossDCReplication.class);
    private final ScheduledExecutorService scheduler;
    private final AtomicLong replicationIndex = new AtomicLong(0);
    private final ConcurrentMap<String, AtomicLong> dcReplicationProgress = new ConcurrentHashMap<>();

    public CrossDCReplication(String localDC, List<String> allDCs,
                            ConsensusSystem localSystem,
                            Map<String, DCConnectionConfig> dcConfigs) {
        this.localDC = localDC;
        this.allDCs = new ArrayList<>(allDCs);
        this.localSystem = localSystem;
        this.dcConnections = new HashMap<>();

        // 初始化數據中心連接
        for (String dc : allDCs) {
            if (!dc.equals(localDC)) {
                DCConnectionConfig config = dcConfigs.get(dc);
                if (config != null) {
                    dcConnections.put(dc, new DCConnection(dc, config));
                    dcReplicationProgress.put(dc, new AtomicLong(0));
                }
            }
        }

        this.scheduler = Executors.newScheduledThreadPool(2, r -> {
            Thread t = new Thread(r, "dc-replication-scheduler");
            t.setDaemon(true);
            return t;
        });

        // 啓動定期複製任務
        scheduler.scheduleWithFixedDelay(
            this::replicateChanges,
            1000, 1000, TimeUnit.MILLISECONDS
        );

        // 啓動健康檢查任務
        scheduler.scheduleWithFixedDelay(
            this::checkDCHealth,
            5000, 5000, TimeUnit.MILLISECONDS
        );
    }

    // 複製請求到其他數據中心
    public CompletableFuture<Boolean> replicateRequest(Request request) {
        MDC.put("component", "cross-dc-replication");
        MDC.put("requestId", request.getId());

        try {
            // 1. 首先在本地DC處理請求
            return localSystem.processWrite(request)
                .thenCompose(localSuccess -> {
                    if (!localSuccess) {
                        logger.warn("Request {} failed in local DC", request.getId());
                        return CompletableFuture.completedFuture(false);
                    }

                    // 2. 如果本地成功,更新複製索引
                    long index = replicationIndex.incrementAndGet();

                    // 3. 異步複製到其他數據中心
                    List<CompletableFuture<Boolean>> dcFutures = new ArrayList<>();

                    for (var entry : dcConnections.entrySet()) {
                        String dc = entry.getKey();
                        DCConnection connection = entry.getValue();

                        dcFutures.add(connection.replicateRequest(request, index)
                            .thenApply(success -> {
                                if (success) {
                                    // 更新複製進度
                                    dcReplicationProgress.get(dc).updateAndGet(
                                        current -> Math.max(current, index));
                                    logger.info("Request {} successfully replicated to DC {}",
                                               request.getId(), dc);
                                } else {
                                    logger.warn("Failed to replicate request {} to DC {}",
                                               request.getId(), dc);
                                }
                                return success;
                            })
                            .exceptionally(e -> {
                                logger.error("Error replicating request {} to DC {}",
                                           request.getId(), dc, e);
                                return false;
                            }));
                    }

                    // 4. 等待所有DC的響應,基於配置的複製策略
                    return handleDCReplications(dcFutures);
                });
        } finally {
            MDC.remove("component");
            MDC.remove("requestId");
        }
    }

    // 根據複製策略處理跨DC複製結果
    private CompletableFuture<Boolean> handleDCReplications(
            List<CompletableFuture<Boolean>> dcFutures) {

        ReplicationStrategy strategy = ReplicationStrategy.QUORUM; // 可配置

        switch (strategy) {
            case ALL:
                // 所有DC都必須成功
                return CompletableFuture.allOf(
                        dcFutures.toArray(new CompletableFuture[0]))
                    .thenApply(v -> dcFutures.stream()
                                    .allMatch(f -> {
                                        try {
                                            return f.get();
                                        } catch (Exception e) {
                                            return false;
                                        }
                                    }));

            case QUORUM:
                // 多數DC必須成功
                return CompletableFuture.supplyAsync(() -> {
                    int successCount = 0;
                    int requiredSuccesses = (dcFutures.size() / 2) + 1;

                    for (CompletableFuture<Boolean> future : dcFutures) {
                        try {
                            if (future.get(5, TimeUnit.SECONDS)) {
                                successCount++;
                                if (successCount >= requiredSuccesses) {
                                    return true;
                                }
                            }
                        } catch (Exception e) {
                            logger.warn("Error waiting for DC replication", e);
                        }
                    }

                    return successCount >= requiredSuccesses;
                });

            case ANY:
                // 至少一個DC成功
                return CompletableFuture.supplyAsync(() -> {
                    for (CompletableFuture<Boolean> future : dcFutures) {
                        try {
                            if (future.get(5, TimeUnit.SECONDS)) {
                                return true;
                            }
                        } catch (Exception e) {
                            logger.warn("Error waiting for DC replication", e);
                        }
                    }

                    return false;
                });

            case ASYNC:
                // 異步複製,不等待結果
                return CompletableFuture.completedFuture(true);

            default:
                logger.warn("Unknown replication strategy: {}, using QUORUM", strategy);
                return CompletableFuture.supplyAsync(() -> {
                    int successCount = 0;
                    int requiredSuccesses = (dcFutures.size() / 2) + 1;

                    for (CompletableFuture<Boolean> future : dcFutures) {
                        try {
                            if (future.get(5, TimeUnit.SECONDS)) {
                                successCount++;
                                if (successCount >= requiredSuccesses) {
                                    return true;
                                }
                            }
                        } catch (Exception e) {
                            logger.warn("Error waiting for DC replication", e);
                        }
                    }

                    return successCount >= requiredSuccesses;
                });
        }
    }

    // 定期同步數據中心之間的變更
    private void replicateChanges() {
        try {
            // 獲取當前複製進度
            Map<String, Long> progress = new HashMap<>();
            for (var entry : dcReplicationProgress.entrySet()) {
                progress.put(entry.getKey(), entry.getValue().get());
            }

            // 對每個DC,複製尚未同步的變更
            for (var entry : dcConnections.entrySet()) {
                String dc = entry.getKey();
                DCConnection connection = entry.getValue();
                long currentProgress = progress.get(dc);

                if (currentProgress < replicationIndex.get()) {
                    // 查找需要複製的變更
                    List<ReplicationEntry> changes =
                        getChangesSince(currentProgress, replicationIndex.get());

                    if (!changes.isEmpty()) {
                        connection.replicateChanges(changes)
                            .thenAccept(lastIndex -> {
                                if (lastIndex > currentProgress) {
                                    // 更新複製進度
                                    dcReplicationProgress.get(dc).updateAndGet(
                                        current -> Math.max(current, lastIndex));
                                    logger.info("Replicated changes to DC {} up to index {}",
                                               dc, lastIndex);
                                }
                            })
                            .exceptionally(e -> {
                                logger.error("Failed to replicate changes to DC {}", dc, e);
                                return null;
                            });
                    }
                }
            }
        } catch (Exception e) {
            logger.error("Error in replication task", e);
        }
    }

    // 檢查數據中心健康狀態
    private void checkDCHealth() {
        for (var entry : dcConnections.entrySet()) {
            String dc = entry.getKey();
            DCConnection connection = entry.getValue();

            connection.checkHealth()
                .thenAccept(healthy -> {
                    if (healthy) {
                        if (connection.markHealthy()) {
                            logger.info("DC {} is now healthy", dc);
                        }
                    } else {
                        if (connection.markUnhealthy()) {
                            logger.warn("DC {} is now unhealthy", dc);
                        }
                    }
                })
                .exceptionally(e -> {
                    logger.error("Error checking health of DC {}", dc, e);
                    connection.markUnhealthy();
                    return null;
                });
        }
    }

    // 獲取指定範圍內的變更
    private List<ReplicationEntry> getChangesSince(long fromIndex, long toIndex) {
        // 實際實現應從日誌存儲中檢索變更
        List<ReplicationEntry> changes = new ArrayList<>();

        // 簡化示例
        for (long i = fromIndex + 1; i <= toIndex; i++) {
            // 模擬獲取變更
            changes.add(new ReplicationEntry(i, null));
        }

        return changes;
    }

    @Override
    public void close() {
        // 關閉調度器
        scheduler.shutdownNow();
        try {
            if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
                logger.warn("Scheduler did not terminate in time");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.warn("Interrupted while waiting for scheduler termination");
        }

        // 關閉所有DC連接
        for (DCConnection connection : dcConnections.values()) {
            connection.close();
        }
    }

    // 數據中心連接類
    private class DCConnection implements AutoCloseable {
        private final String dcId;
        private final DCConnectionConfig config;
        private final AtomicBoolean healthy = new AtomicBoolean(true);
        private final NetworkClient networkClient;

        public DCConnection(String dcId, DCConnectionConfig config) {
            this.dcId = dcId;
            this.config = config;
            this.networkClient = createNetworkClient();
        }

        private NetworkClient createNetworkClient() {
            // 創建用於跨DC通信的網絡客户端
            // 簡化示例
            return null;
        }

        public CompletableFuture<Boolean> replicateRequest(Request request, long index) {
            if (!healthy.get()) {
                return CompletableFuture.completedFuture(false);
            }

            // 實際實現中,將請求發送到目標DC
            return CompletableFuture.completedFuture(true);
        }

        public CompletableFuture<Long> replicateChanges(List<ReplicationEntry> changes) {
            if (!healthy.get() || changes.isEmpty()) {
                return CompletableFuture.completedFuture(0L);
            }

            // 實際實現中,將變更批量發送到目標DC
            long lastIndex = changes.get(changes.size() - 1).getIndex();
            return CompletableFuture.completedFuture(lastIndex);
        }

        public CompletableFuture<Boolean> checkHealth() {
            // 實際實現中,執行健康檢查
            return CompletableFuture.completedFuture(true);
        }

        public boolean markHealthy() {
            return healthy.compareAndSet(false, true);
        }

        public boolean markUnhealthy() {
            return healthy.compareAndSet(true, false);
        }

        @Override
        public void close() {
            // 關閉網絡客户端
        }
    }

    // 複製條目
    private static class ReplicationEntry {
        private final long index;
        private final byte[] data;

        public ReplicationEntry(long index, byte[] data) {
            this.index = index;
            this.data = data != null ? data.clone() : null;
        }

        public long getIndex() {
            return index;
        }

        public byte[] getData() {
            return data != null ? data.clone() : null;
        }
    }

    // 數據中心連接配置
    public static class DCConnectionConfig {
        private final String primaryEndpoint;
        private final List<String> backupEndpoints;
        private final int connectTimeoutMs;
        private final int readTimeoutMs;

        public DCConnectionConfig(String primaryEndpoint, List<String> backupEndpoints,
                                int connectTimeoutMs, int readTimeoutMs) {
            this.primaryEndpoint = primaryEndpoint;
            this.backupEndpoints = new ArrayList<>(backupEndpoints);
            this.connectTimeoutMs = connectTimeoutMs;
            this.readTimeoutMs = readTimeoutMs;
        }

        // Getters...
    }

    // 複製策略
    public enum ReplicationStrategy {
        ALL,     // 所有DC必須成功
        QUORUM,  // 多數DC必須成功
        ANY,     // 至少一個DC成功
        ASYNC    // 異步複製,不等待結果
    }
}

五、ZAB 與 Paxos 的聯繫與區別

聯繫

ZAB 與 Paxos 的聯繫.png

兩者共同點:

  1. 多數派機制:都需要超過半數節點的確認以保證安全性,防止腦裂
  2. 階段性操作:都分為準備和提交/接受兩個主要階段來達成共識
  3. 安全性保證:在任何情況下都不會出現數據不一致的狀態
  4. 容錯能力:都能在部分節點失敗的情況下繼續工作
  5. 對網絡分區的處理:在網絡分區情況下保證安全性,寧可停止服務也不破壞一致性

區別

區別.png

關鍵區別:

  1. 設計目標

    • ZAB:專為 ZooKeeper 設計的狀態機複製協議,強調系統整體的複製和順序性
    • Paxos:通用的分佈式共識算法,關注對單一值的決議過程
  2. 主從關係

    • ZAB:明確的 Leader-Follower 架構,強調中心化處理
    • Basic Paxos:原始設計中角色對稱,沒有固定 Leader
    • Multi-Paxos:引入了 Leader 優化,但在理論上保持角色對稱性
  3. 消息順序

    • ZAB:保證 FIFO 嚴格順序處理,使用 ZXID(epoch + counter)保證全局順序
    • Basic Paxos:不保證順序,只關注單值共識
    • Multi-Paxos:可以通過實例 ID 保證順序,但需要額外機制
  4. 恢復機制

    • ZAB:有專門的崩潰恢復模式,包括選舉、發現、同步和激活等階段
    • Paxos:通過常規算法流程處理崩潰恢復,沒有特殊的恢復模式
  5. 事務標識

    • ZAB:使用 ZXID(epoch + counter)作為全局唯一標識
    • Paxos:使用提案編號(ballot number)和實例 ID 分別標識提案和位置

六、性能對比與工程實踐

性能對比

性能對比.png

指標 ZAB 協議 Basic Paxos Multi-Paxos
寫入延遲 2RTT (正常模式) 2RTT 1RTT (有穩定 Leader)
讀取延遲 0RTT (本地讀) - 1RTT (一致性讀) 2RTT 0RTT (本地讀) - 1RTT (一致性讀)
寫入吞吐量 高 (批處理優化) 中-高 (批處理優化)
讀取吞吐量 非常高 (本地讀) 高 (本地讀)
消息複雜度 O(n) O(n²) O(n) (穩定 Leader)
CPU 消耗 中等 中-高
內存佔用 中等 中等 中等
恢復時間 較短 (專門的恢復機制) 較長 中等

橫向可擴展性

隨着集羣規模增加,性能變化情況:

集羣規模 ZAB 協議 Paxos 算法
3 節點 高吞吐量,低延遲 中等吞吐量,中等延遲
5 節點 良好吞吐量,輕微增加的延遲 吞吐量下降,延遲增加
7 節點 吞吐量下降,延遲增加 顯著吞吐量下降,高延遲
9+節點 不建議(性能下降明顯) 不建議(性能下降明顯)

JVM 調優建議

/**
 * 推薦的JVM參數設置:
 * -Xms4g -Xmx4g                  // 固定堆大小避免動態調整
 * -XX:+UseG1GC                   // 使用G1垃圾收集器
 * -XX:MaxGCPauseMillis=200       // 最大GC暫停時間
 * -XX:InitiatingHeapOccupancyPercent=45  // GC啓動閾值
 * -XX:+AlwaysPreTouch            // 預分配內存頁
 * -XX:+DisableExplicitGC         // 禁用顯式GC調用
 * -XX:+HeapDumpOnOutOfMemoryError // OOM時生成堆轉儲
 * -XX:HeapDumpPath=/path/to/dumps // 堆轉儲路徑
 * -XX:+UseCompressedOops         // 使用壓縮指針
 * -XX:+UseCompressedClassPointers // 使用壓縮類指針
 * -Djava.net.preferIPv4Stack=true // 優先使用IPv4
 */

選型決策

選型決策.png

工程實踐最佳建議

  1. 合理設置超時參數

    • 過短的超時會導致不必要的選舉
    • 過長的超時會增加故障恢復時間
    • 建議根據網絡環境動態調整
  2. 批處理請求

    • 合併多個寫請求為一個批次
    • 減少網絡往返次數
    • 提高整體吞吐量
  3. 讀寫分離

    • 寫請求經過 Leader
    • 讀請求可以在本地處理(根據一致性需求)
    • 使用讀取緩存減少磁盤 IO
  4. 監控關鍵指標

    • 提交延遲
    • Leader 切換頻率
    • 請求排隊深度
    • 網絡延遲和帶寬使用
  5. 預防性維護

    • 定期壓縮日誌
    • 創建快照
    • 監控磁盤空間
    • 進行故障演練測試恢復流程

七、單元測試示例

@RunWith(MockitoJUnitRunner.class)
public class ZABBroadcastTest {
    private ZABBroadcast zabBroadcast;
    private AtomicLong zxid;
    private AtomicInteger epoch;

    @Mock
    private NetworkClient mockNetworkClient;

    @Mock
    private StateMachine mockStateMachine;

    @Before
    public void setUp() {
        zxid = new AtomicLong(0);
        epoch = new AtomicInteger(0);

        zabBroadcast = new ZABBroadcast("server1", zxid, epoch, mockNetworkClient, mockStateMachine);

        // 添加follower
        ServerData follower1 = new ServerData("server2", "localhost", 8001);
        ServerData follower2 = new ServerData("server3", "localhost", 8002);
        zabBroadcast.addFollower(follower1);
        zabBroadcast.addFollower(follower2);
    }

    @After
    public void tearDown() {
        zabBroadcast.close();
    }

    @Test
    public void testProcessWriteSuccess() throws Exception {
        // 準備模擬對象行為
        ACK successAck = new ACK(true, zxid.get() + 1);
        when(mockNetworkClient.sendProposal(anyString(), any(ProposalPacket.class)))
            .thenReturn(successAck);

        // 執行測試
        Request request = new Request("req1", "test data".getBytes());
        CompletableFuture<Boolean> result = zabBroadcast.processWrite(request);

        // 驗證結果
        assertTrue(result.get(1, TimeUnit.SECONDS));

        // 驗證交互
        verify(mockNetworkClient, times(2)).sendProposal(anyString(), any(ProposalPacket.class));
        verify(mockNetworkClient, times(2)).sendCommit(anyString(), any(CommitPacket.class));
    }

    @Test
    public void testProcessWriteFailure() throws Exception {
        // 準備模擬對象行為 - 一個成功,一個失敗
        when(mockNetworkClient.sendProposal(eq("server2"), any(ProposalPacket.class)))
            .thenReturn(new ACK(true, zxid.get() + 1));
        when(mockNetworkClient.sendProposal(eq("server3"), any(ProposalPacket.class)))
            .thenReturn(new ACK(false, zxid.get()));

        // 執行測試
        Request request = new Request("req1", "test data".getBytes());
        CompletableFuture<Boolean> result = zabBroadcast.processWrite(request);

        // 驗證結果 - 應該失敗,因為沒有多數派確認
        assertFalse(result.get(1, TimeUnit.SECONDS));

        // 驗證交互 - 不應該發送commit
        verify(mockNetworkClient, times(2)).sendProposal(anyString(), any(ProposalPacket.class));
        verify(mockNetworkClient, never()).sendCommit(anyString(), any(CommitPacket.class));
    }

    @Test
    public void testBatchWritePerformance() throws Exception {
        // 準備模擬對象行為
        ACK successAck = new ACK(true, zxid.get() + 1);
        when(mockNetworkClient.sendProposal(anyString(), any(ProposalPacket.class)))
            .thenReturn(successAck);

        // 準備批處理請求
        List<Request> requests = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            requests.add(new Request("req" + i, ("data" + i).getBytes()));
        }

        // 執行測試
        Stopwatch stopwatch = Stopwatch.createStarted();
        CompletableFuture<Map<String, Boolean>> result = zabBroadcast.processBatchWrite(requests);
        Map<String, Boolean> results = result.get(5, TimeUnit.SECONDS);
        stopwatch.stop();

        // 驗證結果
        assertEquals(100, results.size());
        assertTrue(results.values().stream().allMatch(v -> v));

        // 打印性能數據
        System.out.println("Batch write of 100 requests took " +
                         stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms");

        // 驗證交互 - 只應該有一次網絡往返
        verify(mockNetworkClient, times(2)).sendProposal(anyString(), any(ProposalPacket.class));
        verify(mockNetworkClient, times(2)).sendCommit(anyString(), any(CommitPacket.class));
    }

    @Test
    public void testCircuitBreakerTrip() throws Exception {
        // 準備模擬對象行為 - 總是失敗
        when(mockNetworkClient.sendProposal(anyString(), any(ProposalPacket.class)))
            .thenReturn(new ACK(false, zxid.get()));

        // 執行多次請求,觸發斷路器
        Request request = new Request("req1", "test data".getBytes());
        for (int i = 0; i < 5; i++) {
            try {
                CompletableFuture<Boolean> result = zabBroadcast.processWrite(request);
                result.get(1, TimeUnit.SECONDS);
            } catch (Exception e) {
                // 忽略預期中的異常
            }
        }

        // 執行第6次請求,應該直接被斷路器拒絕
        try {
            CompletableFuture<Boolean> result = zabBroadcast.processWrite(request);
            result.get(1, TimeUnit.SECONDS);
            fail("Should have thrown CircuitBreakerOpenException");
        } catch (ExecutionException e) {
            assertTrue(e.getCause() instanceof ProcessingException);
            assertTrue(e.getCause().getCause() instanceof ZABBroadcast.CircuitBreakerOpenException);
        }
    }

    @Test
    public void testReadWithConsistencyLevels() throws Exception {
        // 測試不同一致性級別的讀取
        when(mockNetworkClient.sendHeartbeat(anyString(), anyLong()))
            .thenReturn();

        // 執行線性一致性讀取
        CompletableFuture<Result> linearResult =
            zabBroadcast.readWithConsistency("testKey", ConsistencyLevel.LINEARIZABLE);

        // 執行順序一致性讀取
        CompletableFuture<Result> sequentialResult =
            zabBroadcast.readWithConsistency("testKey", ConsistencyLevel.SEQUENTIAL);

        // 執行最終一致性讀取
        CompletableFuture<Result> eventualResult =
            zabBroadcast.readWithConsistency("testKey", ConsistencyLevel.EVENTUAL);

        // 驗證所有請求都成功完成
        assertNotNull(linearResult.get(1, TimeUnit.SECONDS));
        assertNotNull(sequentialResult.get(1, TimeUnit.SECONDS));
        assertNotNull(eventualResult.get(1, TimeUnit.SECONDS));
    }
}

八、客户端 API 示例

public class DistributedSystemClient implements AutoCloseable {
    private final ZabClient zabClient;
    private final PaxosClient paxosClient;
    private final Logger logger = LoggerFactory.getLogger(DistributedSystemClient.class);

    public DistributedSystemClient(String zkConnectString, String paxosConnectString) {
        this.zabClient = new ZabClient(zkConnectString);
        this.paxosClient = new PaxosClient(paxosConnectString);
    }

    // ZAB客户端示例 - 配置服務
    public class ZabClient implements AutoCloseable {
        private final String connectString;
        private final CuratorFramework client;

        public ZabClient(String connectString) {
            this.connectString = connectString;
            this.client = CuratorFramework.builder()
                .connectString(connectString)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .build();
            this.client.start();
        }

        // 存儲配置
        public void storeConfig(String path, String data) throws Exception {
            try {
                // 檢查路徑是否存在
                if (client.checkExists().forPath(path) == null) {
                    client.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.PERSISTENT)
                        .forPath(path, data.getBytes(StandardCharsets.UTF_8));
                    logger.info("Created config at path: {}", path);
                } else {
                    client.setData()
                        .forPath(path, data.getBytes(StandardCharsets.UTF_8));
                    logger.info("Updated config at path: {}", path);
                }
            } catch (Exception e) {
                logger.error("Failed to store config at path: {}", path, e);
                throw e;
            }
        }

        // 讀取配置
        public String getConfig(String path) throws Exception {
            try {
                byte[] data = client.getData().forPath(path);
                return new String(data, StandardCharsets.UTF_8);
            } catch (Exception e) {
                logger.error("Failed to read config from path: {}", path, e);
                throw e;
            }
        }

        // 監聽配置變化
        public void watchConfig(String path, Consumer<String> changeCallback) throws Exception {
            try {
                // 設置監聽器
                client.getData().usingWatcher(new CuratorWatcher() {
                    @Override
                    public void process(WatchedEvent event) throws Exception {
                        if (event.getType() == EventType.NodeDataChanged) {
                            String newData = getConfig(path);
                            changeCallback.accept(newData);
                            // 重新設置監聽
                            watchConfig(path, changeCallback);
                        }
                    }
                }).forPath(path);

                logger.info("Set watch on path: {}", path);
            } catch (Exception e) {
                logger.error("Failed to set watch on path: {}", path, e);
                throw e;
            }
        }

        // 分佈式鎖
        public DistributedLock getLock(String lockPath) {
            return new DistributedLock(client, lockPath);
        }

        @Override
        public void close() {
            client.close();
        }

        // 分佈式鎖實現
        public class DistributedLock {
            private final InterProcessMutex mutex;
            private final String lockPath;

            public DistributedLock(CuratorFramework client, String lockPath) {
                this.lockPath = lockPath;
                this.mutex = new InterProcessMutex(client, lockPath);
            }

            public void lock(long timeout, TimeUnit unit) throws Exception {
                if (mutex.acquire(timeout, unit)) {
                    logger.info("Acquired lock: {}", lockPath);
                } else {
                    logger.warn("Failed to acquire lock: {} within timeout", lockPath);
                    throw new TimeoutException("Failed to acquire lock: " + lockPath);
                }
            }

            public void unlock() {
                try {
                    mutex.release();
                    logger.info("Released lock: {}", lockPath);
                } catch (Exception e) {
                    logger.error("Error releasing lock: {}", lockPath, e);
                }
            }
        }
    }

    // Paxos客户端示例 - 分佈式KV存儲
    public class PaxosClient implements AutoCloseable {
        private final String connectString;
        private final PaxosKVStore kvStore;

        public PaxosClient(String connectString) {
            this.connectString = connectString;
            this.kvStore = new PaxosKVStore(connectString);
        }

        // 寫入鍵值對
        public CompletableFuture<Boolean> put(String key, String value,
                                            ConsistencyLevel consistencyLevel) {
            return kvStore.put(key, value, consistencyLevel);
        }

        // 讀取鍵值
        public CompletableFuture<String> get(String key, ConsistencyLevel consistencyLevel) {
            return kvStore.get(key, consistencyLevel);
        }

        // 刪除鍵
        public CompletableFuture<Boolean> delete(String key) {
            return kvStore.delete(key);
        }

        @Override
        public void close() {
            kvStore.close();
        }

        // Paxos KV存儲實現
        private class PaxosKVStore implements AutoCloseable {
            private final PaxosClient client;

            public PaxosKVStore(String connectString) {
                // 實際實現會連接到Paxos集羣
                this.client = null; // 簡化示例
            }

            public CompletableFuture<Boolean> put(String key, String value,
                                                ConsistencyLevel consistencyLevel) {
                // 實際實現會通過Paxos協議提交寫請求
                logger.info("Putting key: {} with consistency: {}", key, consistencyLevel);
                return CompletableFuture.completedFuture(true);
            }

            public CompletableFuture<String> get(String key, ConsistencyLevel consistencyLevel) {
                // 實際實現會根據一致性級別選擇讀取策略
                logger.info("Getting key: {} with consistency: {}", key, consistencyLevel);
                return CompletableFuture.completedFuture("value");
            }

            public CompletableFuture<Boolean> delete(String key) {
                // 刪除操作也是寫操作,通過Paxos協議提交
                logger.info("Deleting key: {}", key);
                return CompletableFuture.completedFuture(true);
            }

            @Override
            public void close() {
                // 釋放資源
            }
        }
    }

    // 使用示例
    public void runExample() throws Exception {
        // ZAB客户端使用示例
        try (ZabClient zab = new ZabClient("localhost:2181")) {
            // 存儲配置
            zab.storeConfig("/app/config", "{\"timeout\": 30, \"maxRetries\": 3}");

            // 讀取配置
            String config = zab.getConfig("/app/config");
            System.out.println("Config: " + config);

            // 監聽配置變化
            zab.watchConfig("/app/config", newConfig -> {
                System.out.println("Config changed: " + newConfig);
            });

            // 使用分佈式鎖
            ZabClient.DistributedLock lock = zab.getLock("/app/locks/resource1");
            try {
                lock.lock(10, TimeUnit.SECONDS);
                // 臨界區操作
                System.out.println("Performing critical operation...");
                Thread.sleep(1000);
            } finally {
                lock.unlock();
            }
        }

        // Paxos客户端使用示例
        try (PaxosClient paxos = new PaxosClient("localhost:8000,localhost:8001,localhost:8002")) {
            // 寫入數據
            paxos.put("user:1001", "{\"name\":\"John\",\"email\":\"john@example.com\"}",
                     ConsistencyLevel.LINEARIZABLE)
                .thenAccept(success -> {
                    System.out.println("Write success: " + success);
                })
                .join();

            // 讀取數據
            paxos.get("user:1001", ConsistencyLevel.SEQUENTIAL)
                .thenAccept(value -> {
                    System.out.println("User data: " + value);
                })
                .join();

            // 刪除數據
            paxos.delete("user:1001")
                .thenAccept(success -> {
                    System.out.println("Delete success: " + success);
                })
                .join();
        }
    }

    @Override
    public void close() throws Exception {
        zabClient.close();
        paxosClient.close();
    }
}

九、總結

特性 ZAB 協議 Paxos 算法
設計目標 狀態機複製 分佈式共識
主從關係 明確的 Leader-Follower Basic Paxos 無固定角色,Multi-Paxos 有 Leader 優化
消息順序 嚴格 FIFO 順序 Basic Paxos 不保證順序,Multi-Paxos 可保證
恢復機制 專門的恢復模式 通過算法自身流程恢復
實現複雜度 中等
適用場景 需要強一致性和順序保證的系統 通用的分佈式系統,尤其是對單值決議
典型應用 ZooKeeper Chubby, etcd(Raft 變種)
性能特點 寫操作需經過 Leader,讀性能高 基本實現較慢,優化後可獲得高性能
擴展性 受限於 ZooKeeper 架構 基礎理論更易擴展和變形

ZAB 和 Paxos 都是優秀的分佈式一致性算法,在現代分佈式系統設計中佔據核心地位。理解它們的工作原理、實現細節和適用場景,對構建可靠的分佈式系統至關重要。

無論選擇哪種算法,都需要根據具體應用場景、一致性需求和性能要求進行權衡。通過本文展示的工程實踐和優化技術,開發者可以構建出高性能、高可靠的分佈式系統,滿足各種複雜業務場景的需求。

user avatar kenx_5e23c96d15b1d 頭像 sphereex 頭像 dbkernel 頭像
3 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.