本文是基於之前介紹 gRPC 開發文章的延續,代碼模塊介紹,也是基於之前示例代碼的延續。
1. ManagedChannel
ManagedChannel 是 gRPC 中用於管理客户端和服務器之間通信的核心組件。它負責連接的創建、管理、負載均衡、流量控制等功能。以下是 ManagedChannel 的主要功能和屬性。
-
連接管理:
ManagedChannel負責與服務器節點建立和維護 TCP 連接。- 支持 HTTP/2 協議的多路複用。
-
負載均衡:
- 支持多種負載均衡策略,如輪詢(Round Robin)、隨機、哈希等。
- 可以通過服務發現機制動態選擇服務器節點。
-
名稱解析:
- 支持通過 DNS、Etcd 等進行服務名稱解析,以確定目標服務器地址。
-
流量控制:
- 支持基於流的流量控制機制,以避免流量過載。
-
攔截器:
- 支持客户端攔截器,用於在請求發送和響應接收時進行自定義處理。
-
安全性:
- 支持 TLS/SSL 加密,以確保數據傳輸的安全性。
-
重試策略:
- 可以配置重試策略,以應對臨時的網絡故障或服務器錯誤。
-
超時和截止時間:
- 支持請求的超時設置和截止時間控制。
當然,ManagedChannelBuilder 是用於構建 ManagedChannel 的構建器類,它提供了多種方法來配置和定製通道的行為。下面是 ManagedChannelBuilder 的一些重要屬性和方法的詳細介紹,包括 forTarget 方法。
1.1. ManagedChannelBuilder
-
forAddress(String name, int port)
- 用於指定目標服務器的主機名和端口。
- 適合直接連接到單個服務器節點的場景。
-
forTarget(String target)
- 用於指定目標服務的 URI,這個 URI 可以包含服務名稱、負載均衡策略、端口等。
- 適合使用服務發現和負載均衡的場景。例如,
dns:///example.com:8080。
-
usePlaintext()
- 啓用明文通信,不使用 TLS/SSL 加密。
- 適用於開發和測試環境。在生產環境中,通常應使用加密通信。
-
useTransportSecurity()
- 啓用 TLS/SSL 加密,以確保數據傳輸的安全性。
- 適用於生產環境。
-
defaultLoadBalancingPolicy(String policy)
- 設置默認的負載均衡策略,如
round_robin、pick_first等。 - 這會影響請求如何在多個服務器節點之間分配。
- 設置默認的負載均衡策略,如
-
enableRetry()
- 啓用重試機制,以提高請求的可靠性。
- 通常與
maxRetryAttempts(int attempts)一起使用。
-
maxRetryAttempts(int attempts)
- 設置最大重試次數。
- 配合重試策略使用,幫助應對臨時的網絡故障。
-
idleTimeout(long value, TimeUnit unit)
- 設置通道的空閒超時時間。
- 如果通道在指定時間內未使用,將被關閉以釋放資源。
-
keepAliveTime(long keepAliveTime, TimeUnit timeUnit)
- 設置保活時間,以確保連接的活躍性。
- 定期發送保活探測包,以防止連接被網絡設備意外斷開。
-
keepAliveTimeout(long keepAliveTimeout, TimeUnit timeUnit)
- 設置保活超時時間。
- 如果在超時時間內未收到響應,連接將被認為是失敗的。
-
maxInboundMessageSize(int bytes)
- 設置允許接收的最大消息大小(字節)。
- 適用於需要接收大消息的場景。
-
intercept(ClientInterceptor... interceptors)
- 添加一個或多個客户端攔截器。
- 用於在請求發送和響應接收時執行自定義邏輯。
代碼示例
以下是一個使用 ManagedChannelBuilder 的示例,展示如何配置不同的屬性:
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.ClientInterceptor;
public class GrpcClientExample {
public static void main(String[] args) {
// 使用 forTarget 方法指定服務目標
ManagedChannel channel = ManagedChannelBuilder.forTarget("dns:///example.com:8080")
.useTransportSecurity() // 啓用 TLS/SSL 加密
.defaultLoadBalancingPolicy("round_robin") // 使用輪詢負載均衡策略
.enableRetry() // 啓用重試機制
.maxRetryAttempts(3) // 設置最大重試次數
.idleTimeout(5, TimeUnit.MINUTES) // 設置空閒超時時間
.keepAliveTime(1, TimeUnit.MINUTES) // 設置保活時間
.maxInboundMessageSize(10 * 1024 * 1024) // 設置最大入站消息大小為 10 MB
.intercept(new MyClientInterceptor()) // 添加自定義攔截器
.build();
// 使用存根與服務通信
// MyServiceGrpc.MyServiceBlockingStub stub = MyServiceGrpc.newBlockingStub(channel);
// 關閉通道
channel.shutdown();
}
}
// 自定義攔截器示例
class MyClientInterceptor implements ClientInterceptor {
// 實現攔截器邏輯
}
解釋
- forTarget("dns:///example.com:8080"):指定服務目標,使用 DNS 進行名稱解析和負載均衡。
- useTransportSecurity():啓用加密通信,確保數據安全。
- defaultLoadBalancingPolicy("round_robin"):配置為輪詢策略,確保請求均勻分佈到所有可用節點。
- enableRetry() 和 maxRetryAttempts(3):啓用重試機制,設置最大重試次數為 3 次,以提高可靠性。
- idleTimeout 和 keepAliveTime:配置連接的空閒超時和保活設置,以優化資源管理和連接穩定性。
- maxInboundMessageSize:允許接收的最大消息大小為 10 MB。
- intercept:添加自定義攔截器,用於在 gRPC 調用中插入自定義邏輯。
這些配置選項使得 ManagedChannelBuilder 能夠靈活地適應各種應用需求,提供可靠的 gRPC 客户端通信。
1.2. 連接管理
ManagedChannel 的連接管理是 gRPC 框架中一個核心功能,它確保客户端與服務器之間的通信高效且可靠。連接管理涉及多個方面,包括連接的建立、維護、複用、負載均衡以及連接的關閉和清理。以下是對 ManagedChannel 連接管理機制的詳細介紹:
1. 連接建立
-
目標地址解析:
- 當創建
ManagedChannel時,可以通過forAddress或forTarget方法指定目標服務器地址。 forAddress(String name, int port)用於直接指定主機名和端口。forTarget(String target)支持更復雜的目標解析,包括通過 DNS 或其他服務發現機制解析服務名稱。
- 當創建
-
協議支持:
ManagedChannel使用 HTTP/2 協議,這允許在單個 TCP 連接上進行多路複用。- 支持安全通信,通過
useTransportSecurity()方法啓用 TLS/SSL。
2. 連接維護與複用
-
多路複用:
- HTTP/2 的多路複用特性允許在一個連接上併發多個請求和響應,從而提高連接的利用率。
ManagedChannel自動管理連接的複用,無需手動干預。
-
連接保活:
- 通過
keepAliveTime(long keepAliveTime, TimeUnit timeUnit)配置保活探測,確保連接在空閒時不會被網絡設備意外斷開。 keepAliveTimeout(long keepAliveTimeout, TimeUnit timeUnit)設置保活超時時間,確保在指定時間內未收到響應時關閉連接。
- 通過
-
空閒連接管理:
idleTimeout(long value, TimeUnit unit)用於設置連接的空閒超時時間。- 當連接在指定時間內未使用時,
ManagedChannel會自動關閉連接以釋放資源。
3. 負載均衡與服務發現
-
負載均衡:
- 支持多種負載均衡策略(如輪詢、隨機選擇),可通過
defaultLoadBalancingPolicy(String policy)配置。 - 負載均衡器會在多個服務器節點之間分配請求,以實現負載均衡。
- 支持多種負載均衡策略(如輪詢、隨機選擇),可通過
-
服務發現:
ManagedChannel支持動態服務發現,通過名稱解析機制(如 DNS)自動選擇可用的服務器節點。- 這種機制確保客户端可以根據集羣的動態變化調整目標節點。
4. 連接關閉與清理
-
優雅關閉:
shutdown()方法用於請求通道的優雅關閉,允許正在進行的請求完成。awaitTermination(long timeout, TimeUnit unit)等待通道終止,確保資源的正確釋放。
-
強制關閉:
shutdownNow()方法用於強制關閉通道,立即終止所有活動請求。- 強制關閉後,通道不能再用於發送請求。
5. 代碼示例
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.concurrent.TimeUnit;
public class GrpcClient {
private final ManagedChannel channel;
public GrpcClient(String host, int port) {
// 創建並配置 ManagedChannel
this.channel = ManagedChannelBuilder.forAddress(host, port)
.usePlaintext() // 明文通信
.enableRetry() // 啓用重試機制
.keepAliveTime(1, TimeUnit.MINUTES) // 設置保活時間
.idleTimeout(5, TimeUnit.MINUTES) // 設置空閒超時時間
.build();
}
public void shutdown() throws InterruptedException {
// 請求優雅關閉通道
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
public static void main(String[] args) throws InterruptedException {
GrpcClient client = new GrpcClient("localhost", 50051);
try {
// 執行 RPC 調用
} finally {
client.shutdown();
}
}
}
1.3. 高併發調優
在高併發場景下,如果單個連接成為瓶頸,ManagedChannel 可以通過幾種方法來優化性能,以更好地處理高負載請求。以下是一些優化策略:
1. 增加併發流限制
- HTTP/2 多路複用:默認情況下,HTTP/2 支持多路複用,即在一個連接上同時發送多個請求。但服務器和客户端都有最大併發流的限制。
- 調整最大併發流:可以通過配置來增加單個連接上允許的最大併發流數量,從而提高單個連接的利用率。
2. 使用多個連接
- 並行連接:如果一個連接的多路複用能力仍然不足以處理所有併發請求,可以考慮同時使用多個連接到同一個節點。
- 配置通道池:雖然 gRPC 本身沒有顯式的連接池配置,但可以在應用層面創建多個
ManagedChannel實例來實現類似的效果。
3. 優化負載均衡策略
- 自定義負載均衡策略:使用自定義負載均衡策略來動態調整請求分配,以便更好地利用可用的連接和節點資源。
- 健康檢查:確保負載均衡器能根據節點的健康狀態動態調整請求分配,避免將請求發送到負載過高的節點。
4. 網絡和協議優化
- 壓縮:啓用 gRPC 的壓縮功能,減少數據傳輸量,緩解網絡瓶頸。
- 連接保持活動:通過配置保持連接活動,避免因連接建立和關閉導致的開銷。
5. 服務器端優化
- 服務器擴展:在服務器端增加更多的節點或提升單個節點的處理能力,以分散和處理更多的請求。
- 請求隊列和批處理:優化服務器端的請求處理策略,例如使用請求隊列和批處理來提高吞吐量。
示例代碼:調整最大併發流
以下是如何在 Java 中調整 ManagedChannel 的最大併發流配置:
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
public class HighConcurrencyClient {
public static void main(String[] args) {
ManagedChannel channel = ManagedChannelBuilder.forAddress("server.example.com", 50051)
.usePlaintext()
.maxInboundMessageSize(10 * 1024 * 1024) // 設置最大入站消息大小
.build();
// 使用存根與服務通信
// MyServiceGrpc.MyServiceBlockingStub stub = MyServiceGrpc.newBlockingStub(channel);
// MyResponse response = stub.myRpcMethod(MyRequest.newBuilder().build());
// 關閉通道
channel.shutdown();
}
}
注意:maxInboundMessageSize 並不是調整併發流的參數,而是一個示例。具體的併發流限制需要通過其他方式在服務器端配置。
1.4. 連接集羣服務
當客户端連接的服務端是集羣服務,有多個主機節點,顯然不可能依然只創建一個連接,應該想辦法基於每個主機節點創建一個單獨的連接。這裏就用到了 forTarget方法。
1.4.1. forTarget
forTarget 方法是 gRPC 中用於指定服務端地址的關鍵方法之一。它是 ManagedChannelBuilder 的一部分,用於配置客户端與服務器之間的通信渠道。forTarget 方法的使用涉及到地址解析、負載均衡等機制。
1. 基本用法
ManagedChannel channel = ManagedChannelBuilder.forTarget("your-service-address:port")
.usePlaintext() // 如果不使用 TLS
.build();
-
target: 這是一個字符串,通常是服務的地址,可以是以下幾種形式:
- 主機名和端口: 如
"localhost:50051"。 - 負載均衡器地址: 用於連接到負載均衡器,然後由負載均衡器將請求分發到後端服務器。
- URI 形式: 可以包含方案,如
"dns:///example.com:443",這對於使用自定義的命名解析器很有用。
- 主機名和端口: 如
2. 機制和功能
-
地址解析:
- gRPC 使用名稱解析器將目標字符串轉換為實際的服務器地址。默認情況下,gRPC 支持多種解析方式,包括 DNS 和靜態 IP。
- 例如,
dns:///example.com使用 DNS 解析example.com的 IP 地址。
-
負載均衡:
- gRPC 內置支持客户端負載均衡。通過
forTarget提供的地址,gRPC 客户端可以使用不同的負載均衡策略,如輪詢。 - 如果目標地址解析為多個 IP 地址,gRPC 將自動分配請求到這些地址上。
- gRPC 內置支持客户端負載均衡。通過
-
自定義命名解析器:
- 可以通過 SPI(Service Provider Interface)機制來實現自定義命名解析器。這在需要與服務發現系統集成時特別有用。
- 自定義解析器可以解析複雜的服務目標,並動態更新可用服務器列表。
-
連接管理:
ManagedChannel自動管理與服務器的連接,包括重試、連接恢復等。forTarget配置的目標是連接管理的基礎,確保客户端能夠正確定位到服務。
-
安全性:
- 如果目標地址使用安全連接(如 HTTPS),需要配置 TLS。可以通過
useTransportSecurity方法來設置。 - 對於開發和測試環境,可以使用
usePlaintext來禁用 TLS。
- 如果目標地址使用安全連接(如 HTTPS),需要配置 TLS。可以通過
1.4.2. 連接管理機制
在 gRPC 中,ManagedChannel 的設計是為了高效管理和複用連接,特別是在集羣環境下。當你使用 forTarget 方法並通過負載均衡策略連接到集羣服務時,ManagedChannel 會盡量複用現有的連接,而不是為每個節點單獨創建一個新的連接。以下是一些關鍵點,解釋了 ManagedChannel 在連接超時時間內如何管理連接:
1. 連接複用
-
HTTP/2 多路複用:
ManagedChannel使用 HTTP/2 協議,該協議支持在一個連接上併發多個請求和響應。- 這意味着即使在連接超時時間內,
ManagedChannel也會盡量複用現有的連接,而不是為每個請求創建新的連接。
-
連接池:
- 雖然
ManagedChannel本身不實現傳統的連接池,但它通過內部機制管理連接的創建和複用。 ManagedChannel會維護一個連接池,根據需要打開新的連接或複用現有的連接。
- 雖然
2. 負載均衡和連接管理
-
負載均衡策略:
- 通過
defaultLoadBalancingPolicy方法配置的負載均衡策略(如round_robin、pick_first等)決定了請求如何分配到不同的節點。 - 負載均衡器會根據策略選擇合適的節點,並嘗試複用現有的連接。
- 通過
-
連接超時和重試:
- 如果在連接超時時間內無法成功連接到某個節點,
ManagedChannel會根據配置的重試策略嘗試重新連接。 - 重試機制可以在不同節點之間切換,但仍然會盡量複用現有的連接。
- 如果在連接超時時間內無法成功連接到某個節點,
3. 具體行為
-
首次連接:
- 當第一次請求時,
ManagedChannel會根據負載均衡策略選擇一個節點並建立連接。 - 這個連接會被緩存起來,以便後續請求複用。
- 當第一次請求時,
-
後續請求:
- 對於後續的請求,
ManagedChannel會優先複用現有的連接。 - 如果現有連接不可用(例如,連接超時或節點故障),
ManagedChannel會嘗試選擇另一個節點並建立新的連接。
- 對於後續的請求,
-
連接超時:
- 如果在連接超時時間內無法成功連接到任何節點,
ManagedChannel會根據配置的重試策略進行重試。 - 重試過程中,
ManagedChannel會嘗試不同的節點,但仍然會盡量複用現有的連接。
- 如果在連接超時時間內無法成功連接到任何節點,
4. 代碼示例
以下是一個示例,展示瞭如何配置 ManagedChannel 以連接到集羣服務,並處理連接超時和重試:
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.concurrent.TimeUnit;
public class ClusterClient {
public static void main(String[] args) {
// 使用 forTarget 方法指定服務目標
ManagedChannel channel = ManagedChannelBuilder.forTarget("dns:///my-service-cluster")
.useTransportSecurity() // 啓用 TLS/SSL 加密
.defaultLoadBalancingPolicy("round_robin") // 使用輪詢負載均衡策略
.enableRetry() // 啓用重試機制
.maxRetryAttempts(3) // 設置最大重試次數
.idleTimeout(5, TimeUnit.MINUTES) // 設置空閒超時時間
.keepAliveTime(1, TimeUnit.MINUTES) // 設置保活時間
.build();
// 使用存根與服務通信
// MyServiceGrpc.MyServiceBlockingStub stub = MyServiceGrpc.newBlockingStub(channel);
// 執行 RPC 調用
// MyResponse response = stub.myRpcMethod(MyRequest.newBuilder().build());
// 關閉通道
channel.shutdown();
}
}
2. 攔截器
其實在上一篇文章的示例代碼中,就有用到服務端、客户端攔截器。
2.1. 概念
gRPC 攔截器(Interceptor)是 gRPC 框架中用於攔截和處理 RPC 調用過程中的請求和響應的一種機制。攔截器可以在客户端和服務器端使用,允許開發者在請求被髮送到服務器之前、以及響應被返回到客户端之前,執行一些自定義的邏輯。
1. 攔截器的類型
-
客户端攔截器(Client Interceptor):
- 用於在客户端側攔截 RPC 調用。
- 可以用於修改請求元數據、記錄日誌、執行認證和授權等。
-
服務器端攔截器(Server Interceptor):
- 用於在服務器端攔截 RPC 調用。
- 可以用於驗證請求、記錄日誌、處理異常、修改響應等。
2. 應用場景
-
日誌記錄:
- 攔截器可以用於記錄每個 RPC 調用的詳細信息,包括請求和響應的元數據、執行時間等。
- 便於監控和調試。
-
認證和授權:
- 可以在攔截器中實現身份驗證和權限檢查,確保只有經過認證的請求才能訪問服務。
- 攔截器可以從請求的元數據中提取認證信息並進行驗證。
-
異常處理:
- 服務器端攔截器可以用於統一處理異常,將內部異常轉換為標準的 gRPC 狀態碼和消息。
-
請求/響應修改:
- 可以在請求發送之前或響應返回之前對其進行修改,例如添加或移除某些元數據。
-
性能監控和度量:
- 攔截器可以用於收集性能數據,如請求的延遲、吞吐量等。
- 這些數據可以用於生成性能報告和識別瓶頸。
2.2. 使用
2.2.1. 客户端攔截器
客户端攔截器用於在客户端發送請求之前和接收到響應之後執行額外的邏輯。以下是客户端攔截器的詳細使用方法和示例代碼。
1. 創建客户端攔截器
客户端攔截器通過實現 ClientInterceptor 接口來創建。這個接口有一個方法 interceptCall,它允許你在請求發送之前和響應接收之後執行自定義邏輯。
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
public class ClientLoggingInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
System.out.println("Client sending request to method: " + method.getFullMethodName());
super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onMessage(RespT message) {
System.out.println("Client received message: " + message);
super.onMessage(message);
}
@Override
public void onClose(Status status, Metadata trailers) {
System.out.println("Client call closed with status: " + status);
super.onClose(status, trailers);
}
}, headers);
}
@Override
public void sendMessage(ReqT message) {
System.out.println("Client sending message: " + message);
super.sendMessage(message);
}
};
}
}
2. 使用客户端攔截器
在創建客户端通道時,通過 ClientInterceptors 工具類將攔截器附加到通道。
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
public class HelloWorldClientWithInterceptor {
private final GreeterGrpc.GreeterBlockingStub blockingStub;
public HelloWorldClientWithInterceptor(String host, int port) {
ManagedChannel channel = ManagedChannelBuilder.forAddress(host, port)
.usePlaintext()
.build();
// 將攔截器附加到通道
channel = ClientInterceptors.intercept(channel, new ClientLoggingInterceptor());
blockingStub = GreeterGrpc.newBlockingStub(channel);
}
public void greet(String name) {
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
HelloResponse response = blockingStub.sayHello(request);
System.out.println("Greeting: " + response.getMessage());
}
public static void main(String[] args) {
HelloWorldClientWithInterceptor client = new HelloWorldClientWithInterceptor("localhost", 50051);
client.greet("World");
}
}
2.2.2. 服務端攔截器
服務端攔截器用於在服務端接收到請求之前和發送響應之後執行額外的邏輯。以下是服務端攔截器的詳細使用方法和示例代碼。
1. 創建服務端攔截器
服務端攔截器通過實現 ServerInterceptor 接口來創建。這個接口有一個方法 interceptCall,它允許你在請求到達服務實現之前和響應發送之前執行自定義邏輯。
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
public class ServerLoggingInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
System.out.println("Server received call to method: " + call.getMethodDescriptor().getFullMethodName());
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(next.startCall(call, headers)) {
@Override
public void onMessage(ReqT message) {
System.out.println("Server received message: " + message);
super.onMessage(message);
}
@Override
public void onHalfClose() {
System.out.println("Server stream half-closed");
super.onHalfClose();
}
@Override
public void onComplete() {
System.out.println("Server stream completed");
super.onComplete();
}
@Override
public void onCancel() {
System.out.println("Server stream cancelled");
super.onCancel();
}
@Override
public void onReady() {
System.out.println("Server stream ready");
super.onReady();
}
};
}
}
2. 使用服務端攔截器
在創建服務端時,通過 ServerInterceptors 工具類將攔截器附加到服務。
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
public class HelloWorldServerWithInterceptor {
private final Server server;
public HelloWorldServerWithInterceptor(int port) {
server = ServerBuilder.forPort(port)
.addService(ServerInterceptors.intercept(new GreeterImpl(), new ServerLoggingInterceptor()))
.build();
}
public void start() throws IOException {
server.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.err.println("*** shutting down gRPC server since JVM is shutting down");
HelloWorldServerWithInterceptor.this.stop();
System.err.println("*** server shut down");
}));
}
public void stop() {
if (server != null) {
server.shutdown();
}
}
private static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHello(HelloRequest req, StreamObserver<HelloResponse> responseObserver) {
HelloResponse response = HelloResponse.newBuilder()
.setMessage("Hello " + req.getName())
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
HelloWorldServerWithInterceptor server = new HelloWorldServerWithInterceptor(50051);
server.start();
server.server.awaitTermination();
}
}
3. 請求頭
gRPC 請求頭的實現基於 Metadata 類,這是一種用於在客户端和服務器之間傳遞附加信息的鍵值對結構。gRPC 請求頭允許你在調用過程中傳遞額外的上下文信息,比如認證令牌、跟蹤 ID 等。下面是對 gRPC 請求頭的詳細介紹,包括其結構、使用方法和常見應用場景。
3.1. 元數據(Metadata)結構
-
鍵(Key):
- 鍵是一個字符串,必須以小寫字母開頭,並且只能包含小寫字母、數字和連字符(
-)。 - 對於二進制數據,鍵必須以
-bin結尾。
- 鍵是一個字符串,必須以小寫字母開頭,並且只能包含小寫字母、數字和連字符(
-
值(Value):
- 值可以是 ASCII 字符串或字節數組。
-
gRPC 提供了兩種編組器(Marshaller)用於處理這兩種類型的值:
Metadata.ASCII_STRING_MARSHALLER用於 ASCII 字符串。Metadata.BINARY_BYTE_MARSHALLER用於字節數組。
1. 創建 Metadata 實例
你可以創建一個 Metadata 實例來存儲鍵值對:
Metadata metadata = new Metadata();
2. 添加和檢索鍵值對
-
添加鍵值對:
Metadata.Key<String> key = Metadata.Key.of("custom-header", Metadata.ASCII_STRING_MARSHALLER); metadata.put(key, "value"); -
檢索鍵值對:
String value = metadata.get(key);
3. 二進制數據
對於二進制數據,鍵需要以 -bin 結尾:
Metadata.Key<byte[]> binaryKey = Metadata.Key.of("custom-header-bin", Metadata.BINARY_BYTE_MARSHALLER);
metadata.put(binaryKey, new byte[]{0x01, 0x02});
byte[] binaryValue = metadata.get(binaryKey);
3.2. 使用請求頭
在 gRPC 中,通過 ServerInterceptor 可以攔截請求並訪問請求頭。如果你希望在具體的服務實現類中也能夠訪問這些請求頭,可以將請求頭通過上下文傳遞給服務實現。以下是如何在 GreeterImpl 服務實現中訪問請求頭的完整示例:
- 定義服務和攔截器:
import io.grpc.*;
public class HelloWorldServerWithHeaders {
private final Server server;
public HelloWorldServerWithHeaders(int port) {
server = ServerBuilder.forPort(port)
.addService(ServerInterceptors.intercept(new GreeterImpl(), new HeaderInterceptor()))
.build();
}
public void start() throws IOException {
server.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.err.println("*** shutting down gRPC server since JVM is shutting down");
HelloWorldServerWithHeaders.this.stop();
System.err.println("*** server shut down");
}));
}
public void stop() {
if (server != null) {
server.shutdown();
}
}
private static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHello(HelloRequest req, StreamObserver<HelloResponse> responseObserver) {
// 從當前上下文中獲取請求頭
Metadata headers = ServerCallContext.getHeaders();
Metadata.Key<String> customHeaderKey = Metadata.Key.of("custom-header", Metadata.ASCII_STRING_MARSHALLER);
String headerValue = headers.get(customHeaderKey);
System.out.println("Service received header: " + headerValue);
HelloResponse response = HelloResponse.newBuilder()
.setMessage("Hello " + req.getName())
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
private static class HeaderInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
// 將請求頭存儲在上下文中
Context ctx = Context.current().withValue(ServerCallContext.HEADERS_KEY, headers);
return Contexts.interceptCall(ctx, call, headers, next);
}
}
public static void main(String[] args) throws IOException, InterruptedException {
HelloWorldServerWithHeaders server = new HelloWorldServerWithHeaders(50051);
server.start();
server.server.awaitTermination();
}
}
- 上下文類定義:
為了在服務實現中訪問請求頭,我們需要一個上下文類來存儲和獲取這些頭信息:
import io.grpc.Context;
import io.grpc.Metadata;
public class ServerCallContext {
// 創建一個上下文鍵,用於存儲 Metadata
public static final Context.Key<Metadata> HEADERS_KEY = Context.key("metadata-headers");
public static Metadata getHeaders() {
// 從當前上下文中獲取存儲的 Metadata
return HEADERS_KEY.get();
}
}
説明
-
HeaderInterceptor:
- 在攔截器中,使用
Context將請求頭信息存儲起來。 - 使用
Contexts.interceptCall將上下文與調用關聯。
- 在攔截器中,使用
-
GreeterImpl:
- 在服務實現中,使用自定義的
ServerCallContext類獲取請求頭信息。 - 通過
Context機制,可以在服務實現中訪問Metadata對象。
- 在服務實現中,使用自定義的
通過這種方式,你可以在 gRPC 服務實現中訪問客户端發送的請求頭信息。使用 Context 是在 gRPC 中傳遞請求上下文信息的一種常見方法。
3.3. 常見應用場景
-
認證和授權:
- 通過請求頭傳遞認證令牌(如 JWT、OAuth)。
- 服務器攔截器可以驗證令牌的有效性並拒絕未授權的請求。
-
請求跟蹤:
- 傳遞唯一的請求跟蹤 ID,用於分佈式追蹤系統。
- 幫助在微服務架構中追蹤請求的生命週期。
-
版本控制和特性標誌:
- 客户端可以通過請求頭傳遞所需的 API 版本或啓用的特性。
- 服務器可以根據這些信息調整響應行為。
-
自定義業務邏輯:
- 傳遞特定業務上下文信息以影響服務端的處理邏輯。
4. 狀態碼
在 gRPC 中,狀態碼用於表示 RPC 調用的結果。它們幫助客户端了解請求是成功還是失敗,以及如果失敗,是什麼原因導致的。gRPC 定義了一組狀態碼,每個狀態碼都有特定的用途和應用場景。以下是 gRPC 狀態碼的常見應用場景及其詳細使用示例。
4.1. 常見狀態碼
-
OK (Status.Code.OK)
- 場景:請求成功,服務端正確處理了請求並返回了期望的結果。
- 示例:客户端請求數據檢索操作,服務端成功返回數據。
-
CANCELLED (Status.Code.CANCELLED)
- 場景:操作被客户端取消。
- 示例:客户端在長時間的流式響應中取消了請求。
-
使用:
responseObserver.onError(Status.CANCELLED.withDescription("Operation was cancelled by client").asRuntimeException());
-
UNKNOWN (Status.Code.UNKNOWN)
- 場景:發生未知錯誤,通常是由於服務器端拋出異常。
- 示例:服務端捕獲到未處理的異常。
-
使用:
try { // some operation that might throw an exception } catch (Exception e) { responseObserver.onError(Status.UNKNOWN.withDescription("An unknown error occurred").withCause(e).asRuntimeException()); }
-
INVALID_ARGUMENT (Status.Code.INVALID_ARGUMENT)
- 場景:客户端提供了無效的參數。
- 示例:客户端發送了不符合格式要求的請求數據。
-
使用:
if (request.getName() == null || request.getName().isEmpty()) { responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Name cannot be empty").asRuntimeException()); }
-
DEADLINE_EXCEEDED (Status.Code.DEADLINE_EXCEEDED)
- 場景:操作未在指定的時間內完成。
- 示例:客户端設置的請求超時時間被超過。
-
使用:
responseObserver.onError(Status.DEADLINE_EXCEEDED.withDescription("Deadline exceeded").asRuntimeException());
-
NOT_FOUND (Status.Code.NOT_FOUND)
- 場景:請求的資源不存在。
- 示例:客户端請求的記錄在數據庫中不存在。
-
使用:
responseObserver.onError(Status.NOT_FOUND.withDescription("Resource not found").asRuntimeException());
-
ALREADY_EXISTS (Status.Code.ALREADY_EXISTS)
- 場景:嘗試創建已存在的資源。
- 示例:客户端嘗試創建已存在的用户賬號。
-
使用:
responseObserver.onError(Status.ALREADY_EXISTS.withDescription("Resource already exists").asRuntimeException());
-
PERMISSION_DENIED (Status.Code.PERMISSION_DENIED)
- 場景:客户端沒有執行該操作的權限。
- 示例:未授權的用户嘗試訪問受限資源。
-
使用:
responseObserver.onError(Status.PERMISSION_DENIED.withDescription("Permission denied").asRuntimeException());
-
RESOURCE_EXHAUSTED (Status.Code.RESOURCE_EXHAUSTED)
- 場景:資源耗盡,通常是配額或限流問題。
- 示例:超出 API 請求配額。
-
使用:
responseObserver.onError(Status.RESOURCE_EXHAUSTED.withDescription("Resource exhausted, quota exceeded").asRuntimeException());
-
FAILED_PRECONDITION (Status.Code.FAILED_PRECONDITION)
- 場景:操作的前置條件未滿足。
- 示例:在某個特定狀態下操作不被允許。
-
使用:
responseObserver.onError(Status.FAILED_PRECONDITION.withDescription("Failed precondition").asRuntimeException());
-
ABORTED (Status.Code.ABORTED)
- 場景:操作中止,通常由於併發問題。
- 示例:由於版本衝突導致事務中止。
-
使用:
responseObserver.onError(Status.ABORTED.withDescription("Operation aborted due to conflict").asRuntimeException());
-
OUT_OF_RANGE (Status.Code.OUT_OF_RANGE)
- 場景:操作嘗試的範圍超出有效範圍。
- 示例:請求的分頁索引超出有效範圍。
-
使用:
responseObserver.onError(Status.OUT_OF_RANGE.withDescription("Requested index is out of range").asRuntimeException());
-
UNIMPLEMENTED (Status.Code.UNIMPLEMENTED)
- 場景:未實現或不支持的操作。
- 示例:客户端調用了服務端未實現的 API 方法。
-
使用:
responseObserver.onError(Status.UNIMPLEMENTED.withDescription("Method not implemented").asRuntimeException());
-
INTERNAL (Status.Code.INTERNAL)
- 場景:內部錯誤,通常是服務端的問題。
- 示例:服務端的代碼邏輯錯誤導致請求失敗。
-
使用:
responseObserver.onError(Status.INTERNAL.withDescription("Internal server error").asRuntimeException());
-
UNAVAILABLE (Status.Code.UNAVAILABLE)
- 場景:服務不可用,通常是由於臨時故障或過載。
- 示例:服務停機或網絡故障。
-
使用:
responseObserver.onError(Status.UNAVAILABLE.withDescription("Service is currently unavailable").asRuntimeException());
-
DATA_LOSS (Status.Code.DATA_LOSS)
- 場景:數據丟失或損壞。
- 示例:由於存儲問題導致數據丟失。
-
使用:
responseObserver.onError(Status.DATA_LOSS.withDescription("Data loss occurred").asRuntimeException());
-
UNAUTHENTICATED (Status.Code.UNAUTHENTICATED)
- 場景:請求未通過認證。
- 示例:客户端未提供有效的身份驗證憑據。
-
使用:
responseObserver.onError(Status.UNAUTHENTICATED.withDescription("Request not authenticated").asRuntimeException());
4.2. 開發示例
1. 定義服務(Proto 文件)
首先,我們定義一個簡單的 gRPC 服務和消息:
// user.proto
syntax = "proto3";
package example;
service UserService {
rpc GetUser (GetUserRequest) returns (GetUserResponse);
}
message GetUserRequest {
string user_id = 1;
}
message GetUserResponse {
string user_id = 1;
string name = 2;
string email = 3;
}
編譯這個 proto 文件以生成 Java 代碼。
2. 服務端實現
在服務端實現中,我們處理請求並使用 gRPC 狀態碼來應對不同的情況:
// UserServiceImpl.java
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
public class UserServiceImpl extends UserServiceGrpc.UserServiceImplBase {
@Override
public void getUser(GetUserRequest request, StreamObserver<GetUserResponse> responseObserver) {
String userId = request.getUserId();
// 檢查無效參數
if (userId == null || userId.isEmpty()) {
responseObserver.onError(Status.INVALID_ARGUMENT
.withDescription("User ID cannot be empty")
.asRuntimeException());
return;
}
// 模擬數據庫查找
if ("123".equals(userId)) {
GetUserResponse response = GetUserResponse.newBuilder()
.setUserId(userId)
.setName("John Doe")
.setEmail("john.doe@example.com")
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} else {
// 模擬資源未找到
responseObserver.onError(Status.NOT_FOUND
.withDescription("User not found")
.asRuntimeException());
}
}
}
3. 啓動服務器
// UserServer.java
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.IOException;
public class UserServer {
private final int port;
private final Server server;
public UserServer(int port) {
this.port = port;
this.server = ServerBuilder.forPort(port)
.addService(new UserServiceImpl())
.build();
}
public void start() throws IOException {
server.start();
System.out.println("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.err.println("*** shutting down gRPC server since JVM is shutting down");
UserServer.this.stop();
System.err.println("*** server shut down");
}));
}
public void stop() {
if (server != null) {
server.shutdown();
}
}
public void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
UserServer server = new UserServer(50051);
server.start();
server.blockUntilShutdown();
}
}
4. 客户端實現
客户端負責調用服務端的方法並處理返回的狀態碼:
// UserClient.java
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
public class UserClient {
private final UserServiceGrpc.UserServiceBlockingStub blockingStub;
public UserClient(String host, int port) {
ManagedChannel channel = ManagedChannelBuilder.forAddress(host, port)
.usePlaintext() // 不使用 SSL/TLS
.build();
blockingStub = UserServiceGrpc.newBlockingStub(channel);
}
public void getUser(String userId) {
GetUserRequest request = GetUserRequest.newBuilder().setUserId(userId).build();
try {
GetUserResponse response = blockingStub.getUser(request);
System.out.println("User found: " + response.getName() + " (" + response.getEmail() + ")");
} catch (StatusRuntimeException e) {
System.err.println("RPC failed: " + e.getStatus());
}
}
public static void main(String[] args) {
UserClient client = new UserClient("localhost", 50051);
client.getUser("123"); // 應該成功
client.getUser(""); // 應該返回 INVALID_ARGUMENT
client.getUser("999"); // 應該返回 NOT_FOUND
}
}
5. 運行示例
- 編譯 proto 文件:使用
protoc編譯user.proto以生成 Java 代碼。 - 運行服務器:啓動
UserServer。 - 運行客户端:啓動
UserClient,觀察不同的請求如何返回不同的狀態碼。