目錄

  • 1. 為什麼需要DefaultFuture機制?
  • 1.1 單一長連接的挑戰
  • 1.2 示例場景
  • 2. DefaultFuture的核心設計
  • 2.1 關鍵組件
  • 2.2 響應匹配流程
  • 2.3 超時處理
  • 3. DefaultFuture的線程安全與性能優化
  • 3.1 線程安全設計
  • 3.2 性能優化
  • 4. 對比其他RPC框架的響應關聯機制
  • 5. 實際代碼示例
  • 5.1 客户端發送請求
  • 5.2 服務端處理請求
  • 5.3 響應返回與匹配
  • 6. 常見問題與解決方案
  • 6.1 問題1:Future超時未清理
  • 6.2 問題2:請求ID衝突
  • 6.3 問題3:NIO線程阻塞
  • 7. 總結
  • NIO補充
  • dubbo的連接機制補充

1. 為什麼需要DefaultFuture機制?

1.1 單一長連接的挑戰

  • 複用性:所有請求通過同一個Netty Channel(封裝了Socket)發送,響應也通過同一通道返回。
  • 亂序風險:若服務端返回響應的順序與客户端發送請求的順序不一致(例如請求A耗時長,請求B先返回),客户端無法直接區分哪個響應對應哪個請求。

1.2 示例場景

java

// 客户端發送兩個請求(Req1和Req2)
channel.writeAndFlush(new Request(id=1, method="getUser"));  // Req1
channel.writeAndFlush(new Request(id=2, method="getOrder")); // Req2

// 服務端返回兩個響應(順序可能顛倒)
channel.writeAndFlush(new Response(id=2, result="Order123")); // Res2(先返回)
channel.writeAndFlush(new Response(id=1, result="User456"));   // Res1(後返回)
  • 問題:客户端如何知道第一個收到的響應Res2對應的是Req2而非Req1

2. DefaultFuture的核心設計

2.1 關鍵組件

  • Request ID:每個請求攜帶唯一ID(通常為自增序列),服務端返回響應時需攜帶相同ID。
  • Future映射表
  • 客户端維護一個全局的ConcurrentMap<Long, DefaultFuture>,鍵為請求ID,值為對應的Future對象。
  • 示例代碼:
java

// 偽代碼:DefaultFuture的靜態映射表
private static final ConcurrentMap<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();

// 發送請求時註冊Future
public static DefaultFuture sendRequest(Request request, Channel channel) {
    DefaultFuture future = new DefaultFuture(channel, request);
    FUTURES.put(request.getId(), future); // 註冊到映射表
    channel.writeAndFlush(request);
    return future;
}

2.2 響應匹配流程

  1. 服務端返回響應
  • 響應中包含請求ID(如Response(id=2, result="Order123"))。
  1. 客户端接收響應
  • Netty的ChannelHandler解析響應,提取請求ID。
  • FUTURES映射表中查找對應的DefaultFuture
  • 喚醒阻塞的線程(通過Future.set(result)),返回響應結果。
  • 示例代碼:
java

// 偽代碼:響應處理邏輯
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    Response res = (Response) msg;
    DefaultFuture future = FUTURES.get(res.getId()); // 根據ID查找Future
    if (future != null) {
        future.doReceived(res); // 設置結果並喚醒線程
        FUTURES.remove(res.getId()); // 清理已完成的Future
    }
}

2.3 超時處理

  • 超時機制:若響應未在指定時間內到達,DefaultFuture會觸發超時異常。
  • 示例代碼:
java

// 偽代碼:超時檢查
public class TimeoutCheckTask implements Runnable {
    public void run() {
        for (DefaultFuture future : FUTURES.values()) {
            if (future.isTimeout()) {
                future.cancel(true); // 觸發超時異常
                FUTURES.remove(future.getRequest().getId());
            }
        }
    }
}

3. DefaultFuture的線程安全與性能優化

3.1 線程安全設計

  • ConcurrentMap:使用ConcurrentHashMap保證多線程下FUTURES映射表的併發訪問安全。
  • 原子操作Future.set(result)Future.cancel()通過CAS或同步鎖保證原子性。

3.2 性能優化

  • ID複用:請求ID通常為自增長整型,避免頻繁創建大對象。
  • 弱引用清理:對已完成或超時的Future及時從映射表中移除,防止內存泄漏。
  • 異步通知:通過Netty的EventLoop線程處理響應,避免阻塞業務線程。

4. 對比其他RPC框架的響應關聯機制

框架

機制

特點

Dubbo

DefaultFuture + Request ID

輕量級,依賴全局映射表,適合高併發長連接場景。

gRPC

HTTP/2 Stream ID + Header

基於HTTP/2流標識,天然支持多路複用,但需依賴協議層支持。

Thrift

序列化ID + 阻塞隊列

簡單但性能較低,每個請求需阻塞等待響應,不適合高併發。

Spring Cloud

Feign + Ribbon + Hystrix

依賴負載均衡和熔斷器,響應關聯通過HTTP回調實現,靈活性高但開銷較大。


5. 實際代碼示例

5.1 客户端發送請求

java

// 偽代碼:Dubbo客户端調用
RpcContext.getContext().setAttachment("interface", "com.example.UserService");
UserService userService = (UserService) context.getBean("userService");

// 發送請求並獲取Future
DefaultFuture future = DefaultFuture.sendRequest(
    new Request(id=1, method="getUser", params={"userId": "1001"}),
    channel
);

// 異步等待結果(或直接get()阻塞)
future.get(5000, TimeUnit.MILLISECONDS); // 超時時間5秒

5.2 服務端處理請求

java

// 偽代碼:Dubbo服務端實現
@Service
public class UserServiceImpl implements UserService {
    public User getUser(String userId) {
        // 模擬耗時操作
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new User(userId, "Alice");
    }
}

5.3 響應返回與匹配

java

// 偽代碼:服務端返回響應
public void channelRead0(ChannelHandlerContext ctx, Request req) {
    // 處理請求並生成響應
    User user = handleRequest(req);
    Response res = new Response(req.getId(), user);
    
    // 通過同一Channel返回響應
    ctx.writeAndFlush(res);
}

6. 常見問題與解決方案

6.1 問題1:Future超時未清理

  • 現象:大量超時請求的Future堆積在FUTURES表中,導致內存泄漏。
  • 解決
  • 啓用定時任務清理超時Future(Dubbo默認已實現)。
  • 手動調用RpcContext.getContext().removeContext()清理上下文。

6.2 問題2:請求ID衝突

  • 現象:高併發下請求ID重複,導致響應匹配錯誤。
  • 解決
  • 使用原子類(如AtomicLong)生成全局唯一ID。
  • 結合機器IP和端口生成分佈式唯一ID(如Snowflake算法)。

6.3 問題3:NIO線程阻塞

  • 現象:響應處理邏輯耗時過長,阻塞Netty的EventLoop線程。
  • 解決
  • 將耗時操作提交到業務線程池處理:
java

public void channelRead0(ChannelHandlerContext ctx, Response res) {
    executorService.submit(() -> {
        // 耗時處理邏輯
        processResponse(res);
    });
}

7. 總結

  • DefaultFuture機制是Dubbo實現單一長連接下請求-響應有序關聯的核心組件。
  • 關鍵點
  • 通過全局ConcurrentMap維護請求ID與Future的映射關係。
  • 依賴請求ID和響應匹配,確保異步傳輸的正確性。
  • 提供超時控制和線程安全保障。
  • 適用場景:高併發、低延遲的RPC調用,尤其是需要複用長連接的內部服務。

NIO補充

這裏可以直接參考鏈接:

主要依靠的是IO多路複用機制,一個線程監聽多個通道中的事件機制

為什麼是一個線程?防止大量線程發生線程上下文切換導致CPU利用率極低

selector監聽channel中的IO事件

socket中的conntection:socket經歷過三次握手之後建立起來的連接;

socket中的accept:請求建立鏈接;

socket中read事件channel中的讀取事件;

socket中write事件channel中的讀取事件;

dubbo的連接機制補充

這裏我再補充下DefaultFuture對應的機制即可

從理論中來,到實踐中去,最終迴歸理論