1. 官網介紹
Dubbo 協議採用單一長連接和 NIO 異步通訊,適合於小數據量大併發的服務調用,以及服務消費者機器數遠大於服務提供者機器數的情況。
1.1. 特點
dubbo RPC 是dubbo體系中最核心的一種高性能、高吞吐量的遠程調用方式,可以稱之為 多路複用的TCP長連接調用。
主要用於兩個dubbo系統之間作遠程調用,特別適合高併發、小數據的互聯網場景。反之,Dubbo 協議不適合傳送大數據量的服務,比如傳文件,傳視頻等,除非請求量很低。
- 長連接:避免了每次調用新建TCP連接,提高了調用的響應速度。
- 多路複用:單個TCP連接可交替傳輸多個請求和響應的消息,降低了連接的等待閒置時間,從而減少了同樣併發數下的網絡連接數,提高了系統吞吐量
Q1:為什麼要消費者比提供者個數多?
因為 dubbo 協議採用單一長連接,單連接的傳輸是有上限的,根據測試經驗數據每條連接最多隻能壓滿 7MByte(不同的環境可能不一樣,供參考)。
假設網絡為千兆網卡 1024Mbit=128MByte,理論上 1 個服務提供者需要 20 個服務消費者(對應 20 個連接)才能壓滿網卡。
Q2:為什麼不能傳大包?
因為 dubbo 協議採用單一長連接,如果每次請求的數據包大小為 500KByte,假設網絡為千兆網卡 1024Mbit=128MByte,每條連接最大 7MByte (不同的環境可能不一樣),TPS 結果是:
- 單個服務提供者的 TPS(每秒處理事務數)最大為:128MByte / 500KByte = 262。
- 單個消費者調用單個服務提供者的 TPS (每秒處理事務數)最大為:7MByte / 500KByte = 14。
如果能接受,可以考慮使用,否則網絡將成為瓶頸。因為對於單個連接通道來看,倘若有一個超過7MByte的數據包請求,那麼這一秒內就只有這一個請求執行了,其他請求都要阻塞等待。
Q3:為什麼採用異步單一長連接?
因為服務的現狀大都是服務提供者少,通常只有幾台機器,而服務的消費者多,可能整個網站都在訪問該服務。比如 Morgan 的提供者只有 6 台提供者,卻有上百台消費者,每天有 1.5 億次調用,如果採用常規的 hessian 服務,服務提供者很容易就被壓跨。
通過單一連接,保證單一消費者不會壓死提供者。另外長連接,減少連接握手驗證等,並使用異步 IO,複用線程池,防止 C10K 問題。
C10K問題:是關於如何在一個服務器上同時處理10,000個併發連接的問題。每個連接都需要分配一個獨立的線程或進程來處理,都會佔用一定的系統資源,而且線程或進程之間的上下文切換會帶來額外的開銷,影響系統的性能。
解決C10K問題的關鍵在於採用事件驅動編程、非阻塞I/O和輕量級線程等技術。通過這些技術,可以顯著提高服務器處理併發連接的能力,從而更好地支持高併發場景下的應用。
1.2. 基本組成
- Transporter: mina, netty, grizzy
- Serialization: dubbo, hessian2, java, json
- Dispatcher: all, direct, message, execution, connection
- ThreadPool: fixed, cached
默認使用基於 netty 3.2.5.Final 和 hessian2 3.2.1-fixed-2(Alibaba embed version) 的 tbremoting 交互。
- 連接個數:單連接
- 連接方式:長連接
- 傳輸協議:TCP
- 傳輸方式:NIO 異步傳輸
- 序列化:Hessian 二進制序列化
- 適用範圍:傳入傳出參數數據包較小(建議小於100K),消費者比提供者個數多,單一消費者無法壓滿提供者,儘量不要用 dubbo 協議傳輸大文件或超大字符串。
- 適用場景:常規遠程服務方法調用
2. 源碼實現
Dubbo 協議的實現邏輯涉及多個模塊:
- 協議接口 (
Protocol):定義了服務導出和引用。 - DubboProtocol:實現服務的導出和引用。
- 網絡傳輸:使用 Netty 實現高效的網絡 IO。
- 編解碼 (
Codec):負責請求和響應的序列化與反序列化。 - 請求處理:通過
Invoker實現遠程調用。 - 心跳檢測和重連機制:保持連接的活躍性,並在異常時進行重試。
這些模塊的協同工作,使得 Dubbo 協議能夠在大規模分佈式系統中高效地進行服務調用,並在連接異常時進行自動恢復。
2.1. Protocol 接口
Protocol 是 Dubbo 的核心接口之一,負責定義服務的導出和引用。
public interface Protocol {
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
void destroy();
}
2.2. DubboProtocol 類
DubboProtocol 是 Protocol 接口的實現類,處理服務的導出和引用邏輯。
public class DubboProtocol implements Protocol {
private static final DubboProtocol INSTANCE = new DubboProtocol();
private final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<>();
public static DubboProtocol getDubboProtocol() {
return INSTANCE;
}
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
openServer(url);
DubboExporter<T> exporter = new DubboExporter<>(invoker, url, exporterMap);
exporterMap.put(url.getServiceKey(), exporter);
return exporter;
}
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
DubboInvoker<T> invoker = new DubboInvoker<>(type, url, clients);
return invoker;
}
private void openServer(URL url) {
String key = url.getAddress();
ExchangeServer server = serverMap.get(key);
if (server == null) {
server = Exchangers.bind(url, requestHandler);
serverMap.put(key, server);
}
}
}
2.3. Transport 層
Dubbo 使用 Netty 作為底層網絡傳輸框架,提供高效的網絡 IO。
- NettyTransporter:實現了
Transporter接口,負責綁定和連接。
public class NettyTransporter implements Transporter {
@Override
public Server bind(URL url, ChannelHandler handler) throws RemotingException {
return new NettyServer(url, handler);
}
@Override
public Client connect(URL url, ChannelHandler handler) throws RemotingException {
return new NettyClient(url, handler);
}
}
- NettyServer 和 NettyClient:負責服務端和客户端的網絡通信。
public class NettyServer implements Server {
private final ServerBootstrap bootstrap;
private final Channel channel;
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
bootstrap = new ServerBootstrap();
bootstrap.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new NettyCodecAdapter(handler).getDecoder());
ch.pipeline().addLast(new NettyCodecAdapter(handler).getEncoder());
ch.pipeline().addLast(new NettyHandler(handler));
}
});
channel = bootstrap.bind(url.getPort()).sync().channel();
}
}
2.4. Codec 編解碼
Codec 負責請求和響應的編解碼,Dubbo 使用自定義的二進制協議。
- DubboCodec:負責請求和響應的編碼與解碼。
public class DubboCodec extends ExchangeCodec {
@Override
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException {
Serialization serialization = getSerialization(channel);
ObjectOutput output = serialization.serialize(channel.getUrl(), out);
output.writeObject(data);
}
@Override
protected Object decodeRequestData(Channel channel, ObjectInput in) throws IOException {
Serialization serialization = getSerialization(channel);
ObjectInput input = serialization.deserialize(channel.getUrl(), in);
return input.readObject();
}
}
2.5. Invoker 和 Exporter
- Invoker:表示一個可調用的服務接口,是服務調用的核心模型。
- Exporter:負責服務的導出。
public class DubboInvoker<T> extends AbstractInvoker<T> {
private final ExchangeClient[] clients;
@Override
protected Result doInvoke(Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
RpcResult result = new RpcResult();
ResponseFuture future = clients[0].request(inv);
return (Result) future.get();
}
}
2.6. 心跳檢測與異常處理
Dubbo 使用心跳機制來保持長連接的活躍,並檢測連接狀態。心跳異常處理通常涉及到連接的重試和恢復機制。
- HeartbeatHandler:處理心跳檢測。
public class HeartbeatHandler extends AbstractChannelHandlerDelegate {
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Heartbeat) {
channel.send(new HeartbeatResponse());
} else {
super.received(channel, message);
}
}
}
- 重試和恢復機制:在連接中斷時,Dubbo 會嘗試重新連接。這通常由底層的 Netty 實現,通過
ChannelHandler的事件機制處理連接的中斷和重連。
2.7. 連接管理
- 重連機制:在連接中斷後,Netty 可以通過
ChannelFutureListener或定時任務來實現重連。
public class ReconnectHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
reconnect(ctx);
}
private void reconnect(ChannelHandlerContext ctx) {
final EventLoop loop = ctx.channel().eventLoop();
loop.schedule(() -> {
// 重新連接邏輯
}, 1L, TimeUnit.SECONDS);
}
}
3. 連接池管理
3.1. 連接池作用
Dubbo協議中的連接池機制是為了提高網絡通信效率和資源利用率而設計的。連接池可以複用已經建立的網絡連接,減少頻繁建立和關閉連接帶來的開銷,從而提高系統的性能和穩定性。
1. 減少連接開銷
- 減少建立連接的時間:每次建立TCP連接都需要經歷三次握手的過程,這會消耗一定的時間和資源。
- 減少關閉連接的時間:每次關閉TCP連接需要經歷四次揮手的過程,同樣會消耗時間和資源。
2. 提高性能
- 複用連接:連接池可以複用已經建立的連接,避免頻繁的連接建立和關閉,從而提高系統的吞吐量。
- 資源管理:連接池可以統一管理和控制連接資源,避免資源浪費。
3. 提高穩定性
- 連接重用:連接池中的連接可以被多次重用,減少了因頻繁建立連接而導致的不穩定因素。
- 故障恢復:連接池可以監控連接的狀態,自動處理故障連接,提高系統的穩定性。
3.2. 連接池實現
Dubbo協議中的連接池主要通過以下幾個方面實現:
- 連接池管理
- 連接池大小:Dubbo允許配置連接池的最大連接數,以控制連接資源的使用。
- 連接池維護:Dubbo會定期檢查連接池中的連接狀態,清理無效連接,並保持連接池的健康狀態。
- 連接複用
- 長連接:Dubbo默認使用長連接機制,即在服務消費者與服務提供者之間建立持久連接,並複用這些連接進行多次請求。
- 短連接:雖然不常見,Dubbo也支持短連接機制,即每次請求都會建立新的連接並在請求結束後關閉連接。
- 連接生命週期管理
- 連接建立:當服務消費者首次請求服務提供者時,Dubbo會建立一個新的連接並將其加入連接池。
- 連接使用:服務消費者從連接池中獲取連接,並通過該連接發送請求。
- 連接釋放:請求完成後,連接會被歸還到連接池中,供後續請求複用。
- 連接關閉:當連接池中的連接超過最大連接數或連接失效時,Dubbo會關閉部分連接,以保持連接池的健康狀態。
3.3. 連接池配置參數
在 Dubbo 中,連接池的參數配置對於優化系統的性能和穩定性非常重要。通過合理配置這些參數,可以確保系統在高併發場景下能夠高效地處理請求並保持良好的資源利用率。以下是 Dubbo 連接池的主要參數及其詳細説明:
1. connections
- 含義:指定每個消費者與每個提供者節點之間的最大連接數。
- 默認值:通常默認值為 1 或 2。
- 作用:控制每個提供者節點上的連接數量,以平衡性能和資源使用。
-
示例配置:
<dubbo:consumer connections="10" />
2. lazy
- 含義:是否懶加載連接。如果設置為
true,則在首次請求時才建立連接;如果設置為false,則在啓動時立即建立所有連接。 - 默認值:默認值為
false。 - 作用:減少啓動時的連接建立開銷,適用於連接建立較為耗時的場景。
-
示例配置:
<dubbo:consumer lazy="true" />
3. timeout
- 含義:請求超時時間,單位為毫秒。
- 默認值:通常默認值為 10000 毫秒(10 秒)。
- 作用:設置請求的最大等待時間,超過該時間後請求將被取消。
-
示例配置:
<dubbo:reference interface="com.example.Service" timeout="5000" />
4. retries
- 含義:請求失敗後的重試次數。
- 默認值:通常默認值為 2。
- 作用:在網絡不穩定或提供者暫時不可用時自動重試請求。
-
示例配置:
<dubbo:reference interface="com.example.Service" retries="3" />
5. loadbalance
- 含義:負載均衡策略。
- 默認值:默認值為
roundrobin(輪詢)。 - 作用:選擇不同的負載均衡算法來分發請求到不同的提供者節點。
- 可選值:
roundrobin(輪詢)、random(隨機)、leastactive(最少活躍調用數)、consistenthash(一致性哈希)等。 -
示例配置:
<dubbo:reference interface="com.example.Service" loadbalance="leastactive" />
6. threadpool
- 含義:線程池類型。
- 默認值:默認值為
fixed。 - 作用:控制處理請求的線程池類型,影響併發處理能力。
- 可選值:
fixed(固定大小線程池)、cached(可緩存線程池)、scheduled(定時任務線程池)等。 -
示例配置:
<dubbo:protocol name="dubbo" threadpool="fixed" threads="20" />
7. threads
- 含義:線程池中的線程數量。
- 默認值:默認值為 200。
- 作用:控制線程池中處理請求的線程數量。
-
示例配置:
<dubbo:protocol name="dubbo" threads="100" />
3.4. 連接複用限制
既然是連接池,當現有連接達到上限之後,就會觸發創建新連接。那麼既然 dubbo協議 是基於 tcp 連接實現的,複用一個 tcp 連接,怎麼判斷是否達到連接複用的限制呢?
在 Dubbo 中,連接複用限制通常涉及到單個連接上可以同時處理的請求數量。儘管 Dubbo 使用了 Netty 的異步非阻塞 I/O 模型來支持高併發,但在實際應用中,單個連接的複用能力可能受到一些因素的限制。以下是一些可能的限制因素:
- 協議層限制
單連接併發請求數:某些協議(如 HTTP/1.1)在單個連接上可能會有併發請求數的限制。雖然 Dubbo 默認使用的協議是基於長連接和異步通信的,但在某些情況下,協議層面的限制可能會影響單個連接的複用能力。 - 應用層限制
請求處理能力:即使在一個連接上可以同時發送多個請求,服務端處理這些請求的能力也是有限的。如果一個連接上積壓了過多的請求,可能會導致延遲增加或超時。 - 網絡層限制
TCP 窗口大小:TCP 連接的窗口大小可能限制單個連接上傳輸的數據量,從而間接影響併發請求的處理能力。 - Dubbo 配置和實現
連接池配置:雖然 Dubbo 的連接池並沒有明確限制單個連接的併發請求數,但通過合理的連接池配置(如設置最大連接數)可以間接影響連接的複用策略。
資源管理:為了避免單個連接上的過載,Dubbo 可能會在現有連接達到一定負載時創建新的連接。這種負載可以是請求數量、請求大小或其他資源使用指標。
實際應用中的上限
在實際應用中,"達到上限"通常指的是當一個連接的負載(如併發請求數或請求處理時間)超出某個閾值時,系統會選擇創建新的連接來分擔負載。這種策略可以幫助系統在高併發場景下保持穩定的性能。
4. 連接池實現
4.1. 每個提供者獨立連接池
首先,Dubbo 的連接池和 OkHttp 等不同,每個服務消費者對應請求每個服務提供者,就是一個連接池。
另外當服務提供者以集羣方式部署時,服務消費者會與集羣中的每個提供者節點建立連接。
具體來説,Dubbo 的連接池機制會為每個提供者 IP(或節點)維護一組連接。這意味着,對於每一個提供者實例,消費者都會有獨立的連接池來管理與該實例的連接。
-
每個提供者節點獨立的連接池:
- 對於每個服務提供者節點(IP:Port),消費者會維護一個獨立的連接池。這樣可以確保請求能夠被均勻地分發到集羣中的不同節點。
- 這種方式有助於實現負載均衡,因為消費者可以根據負載均衡策略(如隨機、輪詢、最少活躍調用等)選擇合適的提供者節點。
-
連接數配置:
- 可以通過 Dubbo 的配置參數(如
connections)來指定每個消費者到每個提供者節點的最大連接數。這有助於控制系統的資源使用。 - 例如,設置
connections=10意味着每個消費者到每個提供者節點最多可以建立 10 個連接。
- 可以通過 Dubbo 的配置參數(如
-
連接複用與長連接:
- Dubbo 使用長連接機制,這意味着一旦連接建立,消費者會盡量複用這些連接來處理多個請求,而不是頻繁地創建和銷燬連接。
- 連接池的使用使得連接的管理更加高效,減少了連接建立和關閉的開銷。
4.2. 代碼實現
在Dubbo中,當消費者需要調用多個不同的服務提供者(可能是集羣中的多個服務器)時,連接池的管理變得更為複雜。為了更好地理解和解決這個問題,我們需要詳細探討連接池的管理和負載均衡策略。
展示一個示例代碼,展示如何在Dubbo中管理連接池,並處理多個服務提供者的情況。
1. NettyChannelPool 類
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class NettyChannelPoolManager {
private final Map<String, NettyChannelPool> poolMap = new ConcurrentHashMap<>();
private final int maxConnections;
public NettyChannelPoolManager(int maxConnections) {
this.maxConnections = maxConnections;
}
public synchronized Channel getChannel(String host, int port) {
String key = host + ":" + port;
NettyChannelPool pool = poolMap.computeIfAbsent(key, k -> new NettyChannelPool(host, port, maxConnections));
return pool.acquire();
}
public synchronized void releaseChannel(Channel channel) {
String key = extractKeyFromChannel(channel);
NettyChannelPool pool = poolMap.get(key);
if (pool != null) {
pool.release(channel);
}
}
public synchronized void shutdown() {
for (NettyChannelPool pool : poolMap.values()) {
pool.shutdown();
}
poolMap.clear();
}
private String extractKeyFromChannel(Channel channel) {
InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress();
return remoteAddress.getHostString() + ":" + remoteAddress.getPort();
}
private static class NettyChannelPool {
private final Bootstrap bootstrap;
private final EventLoopGroup eventLoopGroup;
private final FixedChannelPool channelPool;
private final int maxConnections;
public NettyChannelPool(String host, int port, int maxConnections) {
this.maxConnections = maxConnections;
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap()
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyChannelHandler());
}
});
channelPool = new FixedChannelPool(maxConnections) {
@Override
protected Channel newChannel() {
try {
Future<Channel> future = bootstrap.connect(host, port).addListener((GenericFutureListener<Future<Channel>>) f -> {
if (!f.isSuccess()) {
f.cause().printStackTrace();
}
});
return future.sync().channel();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
}
public Channel acquire() {
return channelPool.acquire();
}
public void release(Channel channel) {
channelPool.release(channel);
}
public void shutdown() {
channelPool.shutdown();
eventLoopGroup.shutdownGracefully();
}
private static class MyChannelHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Channel active: " + ctx.channel().remoteAddress());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Channel inactive: " + ctx.channel().remoteAddress());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Received message: " + msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
}
}
2. 服務消費者端
public class NettyClient {
public static void main(String[] args) {
int maxConnections = 100;
NettyChannelPoolManager poolManager = new NettyChannelPoolManager(maxConnections);
// 服務提供者 A
String hostA = "localhost";
int portA = 20880;
// 服務提供者 B
String hostB = "localhost";
int portB = 20881;
// 調用服務提供者 A
for (int i = 0; i < 10; i++) {
Channel channelA = poolManager.getChannel(hostA, portA);
channelA.writeAndFlush("Hello, Service A!");
poolManager.releaseChannel(channelA);
}
// 調用服務提供者 B
for (int i = 0; i < 10; i++) {
Channel channelB = poolManager.getChannel(hostB, portB);
channelB.writeAndFlush("Hello, Service B!");
poolManager.releaseChannel(channelB);
}
poolManager.shutdown();
}
}
解釋
-
連接池管理:
NettyChannelPoolManager類管理多個服務提供者的連接池。- 一個
NettyChannelPool對象其實就對應一個連接池,一個Channel對應一個連接。 - 使用一個
ConcurrentHashMap來存儲每個服務提供者的NettyChannelPool對象。
-
獲取連接:
getChannel方法根據服務提供者的IP地址和端口號獲取一個空閒連接。- 如果連接池中沒有空閒連接,會新建一個連接。
-
釋放連接:
releaseChannel方法將使用過的連接歸還到對應的連接池中。
-
關閉連接池:
shutdown方法關閉所有連接池和EventLoopGroup。