目錄
- 1. 為什麼需要DefaultFuture機制?
- 1.1 單一長連接的挑戰
- 1.2 示例場景
- 2. DefaultFuture的核心設計
- 2.1 關鍵組件
- 2.2 響應匹配流程
- 2.3 超時處理
- 3. DefaultFuture的線程安全與性能優化
- 3.1 線程安全設計
- 3.2 性能優化
- 4. 對比其他RPC框架的響應關聯機制
- 5. 實際代碼示例
- 5.1 客户端發送請求
- 5.2 服務端處理請求
- 5.3 響應返回與匹配
- 6. 常見問題與解決方案
- 6.1 問題1:Future超時未清理
- 6.2 問題2:請求ID衝突
- 6.3 問題3:NIO線程阻塞
- 7. 總結
- NIO補充
- dubbo的連接機制補充
1. 為什麼需要DefaultFuture機制?
1.1 單一長連接的挑戰
- 複用性:所有請求通過同一個Netty
Channel(封裝了Socket)發送,響應也通過同一通道返回。 - 亂序風險:若服務端返回響應的順序與客户端發送請求的順序不一致(例如請求A耗時長,請求B先返回),客户端無法直接區分哪個響應對應哪個請求。
1.2 示例場景
java
// 客户端發送兩個請求(Req1和Req2)
channel.writeAndFlush(new Request(id=1, method="getUser")); // Req1
channel.writeAndFlush(new Request(id=2, method="getOrder")); // Req2
// 服務端返回兩個響應(順序可能顛倒)
channel.writeAndFlush(new Response(id=2, result="Order123")); // Res2(先返回)
channel.writeAndFlush(new Response(id=1, result="User456")); // Res1(後返回)
- 問題:客户端如何知道第一個收到的響應
Res2對應的是Req2而非Req1?
2. DefaultFuture的核心設計
2.1 關鍵組件
- Request ID:每個請求攜帶唯一ID(通常為自增序列),服務端返回響應時需攜帶相同ID。
- Future映射表:
- 客户端維護一個全局的
ConcurrentMap<Long, DefaultFuture>,鍵為請求ID,值為對應的Future對象。 - 示例代碼:
java
// 偽代碼:DefaultFuture的靜態映射表
private static final ConcurrentMap<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
// 發送請求時註冊Future
public static DefaultFuture sendRequest(Request request, Channel channel) {
DefaultFuture future = new DefaultFuture(channel, request);
FUTURES.put(request.getId(), future); // 註冊到映射表
channel.writeAndFlush(request);
return future;
}
2.2 響應匹配流程
- 服務端返回響應:
- 響應中包含請求ID(如
Response(id=2, result="Order123"))。
- 客户端接收響應:
- Netty的
ChannelHandler解析響應,提取請求ID。 - 從
FUTURES映射表中查找對應的DefaultFuture。 - 喚醒阻塞的線程(通過
Future.set(result)),返回響應結果。 - 示例代碼:
java
// 偽代碼:響應處理邏輯
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Response res = (Response) msg;
DefaultFuture future = FUTURES.get(res.getId()); // 根據ID查找Future
if (future != null) {
future.doReceived(res); // 設置結果並喚醒線程
FUTURES.remove(res.getId()); // 清理已完成的Future
}
}
2.3 超時處理
- 超時機制:若響應未在指定時間內到達,
DefaultFuture會觸發超時異常。 - 示例代碼:
java
// 偽代碼:超時檢查
public class TimeoutCheckTask implements Runnable {
public void run() {
for (DefaultFuture future : FUTURES.values()) {
if (future.isTimeout()) {
future.cancel(true); // 觸發超時異常
FUTURES.remove(future.getRequest().getId());
}
}
}
}
3. DefaultFuture的線程安全與性能優化
3.1 線程安全設計
- ConcurrentMap:使用
ConcurrentHashMap保證多線程下FUTURES映射表的併發訪問安全。 - 原子操作:
Future.set(result)和Future.cancel()通過CAS或同步鎖保證原子性。
3.2 性能優化
- ID複用:請求ID通常為自增長整型,避免頻繁創建大對象。
- 弱引用清理:對已完成或超時的
Future及時從映射表中移除,防止內存泄漏。 - 異步通知:通過Netty的
EventLoop線程處理響應,避免阻塞業務線程。
4. 對比其他RPC框架的響應關聯機制
|
框架 |
機制 |
特點 |
|
Dubbo |
DefaultFuture + Request ID
|
輕量級,依賴全局映射表,適合高併發長連接場景。
|
|
gRPC |
HTTP/2 Stream ID + Header
|
基於HTTP/2流標識,天然支持多路複用,但需依賴協議層支持。
|
|
Thrift |
序列化ID + 阻塞隊列
|
簡單但性能較低,每個請求需阻塞等待響應,不適合高併發。
|
|
Spring Cloud |
Feign + Ribbon + Hystrix
|
依賴負載均衡和熔斷器,響應關聯通過HTTP回調實現,靈活性高但開銷較大。
|
5. 實際代碼示例
5.1 客户端發送請求
java
// 偽代碼:Dubbo客户端調用
RpcContext.getContext().setAttachment("interface", "com.example.UserService");
UserService userService = (UserService) context.getBean("userService");
// 發送請求並獲取Future
DefaultFuture future = DefaultFuture.sendRequest(
new Request(id=1, method="getUser", params={"userId": "1001"}),
channel
);
// 異步等待結果(或直接get()阻塞)
future.get(5000, TimeUnit.MILLISECONDS); // 超時時間5秒
5.2 服務端處理請求
java
// 偽代碼:Dubbo服務端實現
@Service
public class UserServiceImpl implements UserService {
public User getUser(String userId) {
// 模擬耗時操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new User(userId, "Alice");
}
}
5.3 響應返回與匹配
java
// 偽代碼:服務端返回響應
public void channelRead0(ChannelHandlerContext ctx, Request req) {
// 處理請求並生成響應
User user = handleRequest(req);
Response res = new Response(req.getId(), user);
// 通過同一Channel返回響應
ctx.writeAndFlush(res);
}
6. 常見問題與解決方案
6.1 問題1:Future超時未清理
- 現象:大量超時請求的
Future堆積在FUTURES表中,導致內存泄漏。 - 解決:
- 啓用定時任務清理超時
Future(Dubbo默認已實現)。 - 手動調用
RpcContext.getContext().removeContext()清理上下文。
6.2 問題2:請求ID衝突
- 現象:高併發下請求ID重複,導致響應匹配錯誤。
- 解決:
- 使用原子類(如
AtomicLong)生成全局唯一ID。 - 結合機器IP和端口生成分佈式唯一ID(如Snowflake算法)。
6.3 問題3:NIO線程阻塞
- 現象:響應處理邏輯耗時過長,阻塞Netty的
EventLoop線程。 - 解決:
- 將耗時操作提交到業務線程池處理:
java
public void channelRead0(ChannelHandlerContext ctx, Response res) {
executorService.submit(() -> {
// 耗時處理邏輯
processResponse(res);
});
}
7. 總結
- DefaultFuture機制是Dubbo實現單一長連接下請求-響應有序關聯的核心組件。
- 關鍵點:
- 通過全局
ConcurrentMap維護請求ID與Future的映射關係。 - 依賴請求ID和響應匹配,確保異步傳輸的正確性。
- 提供超時控制和線程安全保障。
- 適用場景:高併發、低延遲的RPC調用,尤其是需要複用長連接的內部服務。
NIO補充
這裏可以直接參考鏈接:
主要依靠的是IO多路複用機制,一個線程監聽多個通道中的事件機制
為什麼是一個線程?防止大量線程發生線程上下文切換導致CPU利用率極低
selector監聽channel中的IO事件
socket中的conntection:socket經歷過三次握手之後建立起來的連接;
socket中的accept:請求建立鏈接;
socket中read事件channel中的讀取事件;
socket中write事件channel中的讀取事件;
dubbo的連接機制補充
這裏我再補充下DefaultFuture對應的機制即可
從理論中來,到實踐中去,最終迴歸理論