博客 / 詳情

返回

gRPC - 介紹與開發 1

gRPC 是一個高性能、開源的遠程過程調用(RPC)框架,由 Google 開發。它旨在提供跨語言的通信能力,適用於從移動設備到數據中心服務器的各種環境。

1. 核心概念

  1. Protocol Buffers(protobuf)

    • gRPC 使用 Protocol Buffers 作為其接口描述語言和數據序列化協議。開發者通過定義 .proto 文件來指定服務和消息格式。Protocol Buffers 提供了一種緊湊的二進制格式,支持快速序列化和反序列化。
  2. HTTP/2 協議

    • gRPC 基於 HTTP/2 進行通信。HTTP/2 提供了多路複用、頭部壓縮、流控制和雙向流等特性,顯著提高了傳輸效率和性能。
  3. 服務定義和代碼生成

    • 開發者在 .proto 文件中定義服務和 RPC 方法。gRPC 編譯器 protoc 根據這些定義生成客户端和服務器端的代碼存根,簡化了開發工作。
  4. 多種通信模式
    gRPC 支持四種服務方法類型,這使得 gRPC 可以適應多種應用場景:

    • 單一響應 RPC
    • 服務器流式 RPC
    • 客户端流式 RPC
    • 雙向流式 RPC
  5. 安全性

    • gRPC 支持 TLS/SSL 加密,提供安全的通信通道。它還支持多種身份驗證機制。

2. 實現原理

  1. 服務定義和編譯
  • 定義服務:在 .proto 文件中定義服務接口和消息類型。

    syntax = "proto3";
    
    service Greeter {
        rpc SayHello (HelloRequest) returns (HelloResponse);
    }
    
    message HelloRequest {
        string name = 1;
    }
    
    message HelloResponse {
        string message = 1;
    }
  • 編譯生成代碼:使用 protoc 編譯器生成客户端和服務器代碼。這些代碼包含了與服務交互所需的存根和骨架。
  1. 客户端和服務器的實現
  • 服務器端:實現生成的服務接口,定義具體的業務邏輯。啓動 gRPC 服務器,註冊服務。

    public class GreeterImpl extends GreeterGrpc.GreeterImplBase {
        @Override
        public void sayHello(HelloRequest req, StreamObserver<HelloResponse> responseObserver) {
            HelloResponse response = HelloResponse.newBuilder()
                                                  .setMessage("Hello, " + req.getName())
                                                  .build();
            responseObserver.onNext(response);
            responseObserver.onCompleted();
        }
    }
  • 客户端:使用生成的客户端存根與服務器交互。客户端可以選擇同步或異步調用。

    ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
                                                  .usePlaintext()
                                                  .build();
    GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(channel);
    HelloResponse response = stub.sayHello(HelloRequest.newBuilder().setName("World").build());
  1. 傳輸層:基於 Netty 的實現
  • Netty 集成:gRPC 的 Java 實現使用 Netty 作為傳輸層框架。Netty 提供了異步事件驅動的 I/O 模型,支持高效的網絡通信。
  • HTTP/2 支持:Netty 提供對 HTTP/2 的支持,使得 gRPC 可以利用 HTTP/2 的特性,如多路複用和頭部壓縮。
  1. 數據序列化和反序列化
  • gRPC 使用 Protocol Buffers 進行消息的序列化和反序列化。Protocol Buffers 提供了高效的二進制格式,減少了數據傳輸的開銷。
  1. 攔截器和中間件
  • gRPC 支持攔截器機制,允許在請求和響應的生命週期中插入自定義邏輯。這類似於中間件,可以用於實現日誌記錄、認證、審計等功能。
  1. 負載均衡和服務發現
  • gRPC 提供了多種負載均衡策略,並且可以與服務發現機制集成。客户端可以自動選擇最佳的服務器實例來處理請求。

3. 代碼示例

要在 Java 項目中使用 gRPC,你需要設置 Maven 項目並添加必要的依賴項,然後編寫服務和客户端代碼。以下是一個簡單的示例,展示如何在 Java 中使用 gRPC。

3.1. 設置 Maven 項目

首先,確保你的 pom.xml 文件包含以下 gRPC 和 Protocol Buffers 相關的依賴和插件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>grpc-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <grpc.version>1.56.0</grpc.version>
        <protobuf.version>3.21.12</protobuf.version>
        <os.detected.classifier>osx-x86_64</os.detected.classifier>
    </properties>

    <dependencies>
        <!-- gRPC dependencies -->
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-netty</artifactId>
            <version>${grpc.version}</version>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-protobuf</artifactId>
            <version>${grpc.version}</version>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-stub</artifactId>
            <version>${grpc.version}</version>
        </dependency>
        <!-- Protocol Buffers -->
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>${protobuf.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- Maven Compiler Plugin -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>${maven.compiler.source}</source>
                    <target>${maven.compiler.target}</target>
                </configuration>
            </plugin>
            <!-- Protobuf Maven Plugin -->
            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.6.1</version>
                <configuration>
                    <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
                    <pluginId>grpc-java</pluginId>
                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!-- OS Maven Plugin -->
            <plugin>
                <groupId>kr.motd.maven</groupId>
                <artifactId>os-maven-plugin</artifactId>
                <version>1.7.0</version>
            </plugin>
        </plugins>
    </build>
</project>

os.detected.classifier 與使用的系統有關

3.2. 定義 Protocol Buffers 文件

首先,定義 .proto 文件以支持各種類型的流:

src/main/proto 目錄下創建一個文件 hello.proto

syntax = "proto3";

option java_package = "com.example.grpc";
option java_outer_classname = "HelloProto";

service Greeter {
  // 單一響應
  rpc SayHello (HelloRequest) returns (HelloReply) {}

  // 客户端流
  rpc SayHelloClientStream (stream HelloRequest) returns (HelloReply) {}

  // 服務器流
  rpc SayHelloServerStream (HelloRequest) returns (stream HelloReply) {}

  // 雙向流
  rpc SayHelloBidirectionalStream (stream HelloRequest) returns (stream HelloReply) {}
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}
基於 .proto 生成 Java 文件

在命令行中運行以下命令來生成 Java 類:

mvn clean compile

這將使用 protobuf-maven-plugin 生成 gRPC 服務和消息類。

生成類在 /target目錄下,即 /target/generated-sources/protobuf/grpc-java/org/example/grpc

3.3. 實現 gRPC 服務

在服務端實現這些不同類型的流,同時使用攔截器來處理請求頭:

import io.grpc.*;
import io.grpc.stub.StreamObserver;
import io.netty.util.internal.StringUtil;
import org.example.grpc.interceptor.ServerHeaderInterceptor;
import org.example.grpc.interceptor.ServerLoggingInterceptor;
import java.io.IOException;
import java.util.logging.Logger;


public class HelloServer {

    private static final Logger logger = Logger.getLogger(HelloServer.class.getName());
    private Server server;

    public static void main(String[] args) throws IOException, InterruptedException {
        final HelloServer server = new HelloServer();
        server.start();
        server.blockUntilShutdown();
    }

    private void start() throws IOException {
        int port = 50051;
        server = ServerBuilder.forPort(port)
                .addService(new GreeterImpl())
                .intercept(new ServerHeaderInterceptor()) // 添加攔截器
                .intercept(new ServerLoggingInterceptor())
                .build()
                .start();
        logger.info("Server started, listening on " + port);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.err.println("*** shutting down gRPC server since JVM is shutting down");
            HelloServer.this.stop();
            System.err.println("*** server shut down");
        }));
    }

    private void stop() {
        if (server != null) {
            server.shutdown();
        }
    }

    private void blockUntilShutdown() throws InterruptedException {
        if (server != null) {
            server.awaitTermination();
        }
    }

    static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
        /**
         * 單一響應
         */
        @Override
        public void sayHello(HelloProto.HelloRequest req, StreamObserver<HelloProto.HelloReply> responseObserver) {
            System.out.println("【單一響應】收到消息 - " + req.getName());
            HelloProto.HelloReply reply = HelloProto.HelloReply.newBuilder()
                    .setMessage("【單一響應】Hello " + req.getName())
                    .build();
            System.out.println("【單一響應】響應消息 - " + req.getName());
            responseObserver.onNext(reply);
            System.out.println("【單一響應】服務端發起關閉");
            responseObserver.onCompleted();
        }

        /**
         * 客户端流
         */
        @Override
        public StreamObserver<HelloProto.HelloRequest> sayHelloClientStream(final StreamObserver<HelloProto.HelloReply> responseObserver) {
            return new StreamObserver<HelloProto.HelloRequest>() {
                StringBuilder names = new StringBuilder();

                @Override
                public void onNext(HelloProto.HelloRequest request) {
                    if (names.length() > 0) {
                        names.append(", ");
                    }
                    System.out.println("【客户端流】收到消息 - " + request.getName());
                    names.append(request.getName());
                }

                @Override
                public void onError(Throwable t) {
                    logger.warning("Error in client stream: " + t);
                }

                @Override
                public void onCompleted() {
                    System.out.println("【客户端流】收到客户端已關閉消息");
                    HelloProto.HelloReply reply = HelloProto.HelloReply.newBuilder()
                            .setMessage("【客户端流】Hello " + names.toString())
                            .build();
                    System.out.println("【客户端流】響應消息 - " + names.toString());
                    responseObserver.onNext(reply);
                    System.out.println("【客户端流】服務端發起關閉");
                    responseObserver.onCompleted();
                }
            };
        }

        /**
         * 服務端流
         */
        @Override
        public void sayHelloServerStream(HelloProto.HelloRequest req, StreamObserver<HelloProto.HelloReply> responseObserver) {
            for (int i = 0; i < 5; i++) {
                String msg = req.getName() + StringUtil.SPACE + i;
                HelloProto.HelloReply reply = HelloProto.HelloReply.newBuilder()
                        .setMessage("【服務端流】Hello " + msg)
                        .build();
                System.out.println("【服務端流】響應消息 - " + msg);
                responseObserver.onNext(reply);
            }
            System.out.println("【服務端流】服務端發起關閉");
            responseObserver.onCompleted();
        }

        /**
         * 雙向流
         */
        @Override
        public StreamObserver<HelloProto.HelloRequest> sayHelloBidirectionalStream(final StreamObserver<HelloProto.HelloReply> responseObserver) {
            return new StreamObserver<HelloProto.HelloRequest>() {
                @Override
                public void onNext(HelloProto.HelloRequest request) {
                    HelloProto.HelloReply reply = HelloProto.HelloReply.newBuilder()
                            .setMessage("Hello " + request.getName())
                            .build();
                    System.out.println("【雙向流】收到消息 - " + request.getName());
                    System.out.println("【雙向流】回覆消息 - " + request.getName());
                    responseObserver.onNext(reply);
                }

                @Override
                public void onError(Throwable t) {
                    logger.warning("Error in bidirectional stream: " + t);
                }

                @Override
                public void onCompleted() {
                    System.out.println("【雙向流】收到客户端已關閉消息");
                    System.out.println("【雙向流】服務端發起關閉");
                    responseObserver.onCompleted();
                }
            };
        }
    }
}

3.4. 實現 gRPC 客户端

編寫客户端代碼來調用這些不同類型的流:

抱歉,以下是完整的客户端代碼示例,包括所有類型的流調用:

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import org.example.grpc.interceptor.ClientLoggingInterceptor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public class HelloClient {
    private static final Logger logger = Logger.getLogger(HelloClient.class.getName());
    private final ManagedChannel channel;
    private final GreeterGrpc.GreeterBlockingStub blockingStub;
    private final GreeterGrpc.GreeterStub asyncStub;

    public HelloClient(String host, int port) {
        channel = ManagedChannelBuilder.forAddress(host, port)
                .usePlaintext()
                .intercept(new ClientLoggingInterceptor())
                .build();

        blockingStub = GreeterGrpc.newBlockingStub(channel);
        asyncStub = GreeterGrpc.newStub(channel);
    }

    public static void main(String[] args) throws Exception {
        HelloClient client = new HelloClient("localhost", 50051);
        try {
            client.greet("world");
            client.greetClientStream("Alice", "Bob", "Charlie");
            client.greetServerStream("Dave");
            client.greetBidirectionalStream("Eve", "Frank");
        } finally {
            client.shutdown();
        }
    }


    public void shutdown() throws InterruptedException {
        channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
    }

    /**
     * 單一響應
     */
    public void greet(String name) {
        HelloProto.HelloRequest request = HelloProto.HelloRequest.newBuilder().setName(name).build();
        HelloProto.HelloReply response;
        try {
            System.out.println("【單一響應】發送消息: " + name);
            response = blockingStub.sayHello(request);
            System.out.println("【單一響應】收到響應: " + response.getMessage());
        } catch (Exception e) {
            logger.log(Level.WARNING, "RPC failed: {0}", e.getMessage());
        }
    }

    /**
     * 客户端流
     */
    public void greetClientStream(String... names) {
        StreamObserver<HelloProto.HelloReply> responseObserver = new StreamObserver<HelloProto.HelloReply>() {
            @Override
            public void onNext(HelloProto.HelloReply value) {
                System.out.println("【客户端流】收到響應: " + value.getMessage());
            }

            @Override
            public void onError(Throwable t) {
                logger.warning("【客户端流】Error in client stream: " + t);
            }

            @Override
            public void onCompleted() {
                System.out.println("【客户端流】收到服務端已關閉消息");
            }
        };

        StreamObserver<HelloProto.HelloRequest> requestObserver = asyncStub.sayHelloClientStream(responseObserver);
        for (String name : names) {
            HelloProto.HelloRequest request = HelloProto.HelloRequest.newBuilder().setName(name).build();
            System.out.println("【客户端流】發送消息 - " + name);
            requestObserver.onNext(request);
        }
        System.out.println("【客户端流】客户端發起關閉");
        requestObserver.onCompleted();
    }

    /**
     * 服務端流
     */
    public void greetServerStream(String name) {
        HelloProto.HelloRequest request = HelloProto.HelloRequest.newBuilder().setName(name).build();
        StreamObserver<HelloProto.HelloReply> responseObserver = new StreamObserver<HelloProto.HelloReply>() {
            @Override
            public void onNext(HelloProto.HelloReply value) {
                System.out.println("【服務端流】收到響應: " + value.getMessage());
            }

            @Override
            public void onError(Throwable t) {
                logger.warning("【服務端流】Error in server stream: " + t);
            }

            @Override
            public void onCompleted() {
                System.out.println("【服務端流】收到服務端已關閉消息");
            }
        };
        asyncStub.sayHelloServerStream(request, responseObserver);
    }

    /**
     * 雙向流
     */
    public void greetBidirectionalStream(String... names) {
        StreamObserver<HelloProto.HelloReply> responseObserver = new StreamObserver<HelloProto.HelloReply>() {
            @Override
            public void onNext(HelloProto.HelloReply value) {
                System.out.println("【雙向流】收到響應: " + value.getMessage());
            }

            @Override
            public void onError(Throwable t) {
                logger.warning("【雙向流】Error in bidirectional stream: " + t);
            }

            @Override
            public void onCompleted() {
                System.out.println("【雙向流】收到服務端已關閉消息");
            }
        };

        StreamObserver<HelloProto.HelloRequest> requestObserver = asyncStub.sayHelloBidirectionalStream(responseObserver);
        for (String name : names) {
            HelloProto.HelloRequest request = HelloProto.HelloRequest.newBuilder().setName(name).build();
            requestObserver.onNext(request);
        }
        System.out.println("【雙向流】客户端發起關閉");
        requestObserver.onCompleted();
    }
}

3.5. 實現攔截器

gRPC 支持攔截器來處理請求頭等。以下是一個簡單的攔截器示例:

服務端攔截器 1
import io.grpc.*;

public class ServerHeaderInterceptor implements ServerInterceptor {

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
            ServerCall<ReqT, RespT> call,
            Metadata headers,
            ServerCallHandler<ReqT, RespT> next) {

        System.out.println("【ServerHeaderInterceptor】Received headers: " + headers);

        // 可以在這裏檢查或修改請求頭
        return next.startCall(call, headers);
    }
}
服務端攔截器 2
import io.grpc.*;

public class ServerLoggingInterceptor implements ServerInterceptor {
    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
            ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
        System.out.println("【ServerLoggingInterceptor】Server received call to method: " + call.getMethodDescriptor().getFullMethodName());
        return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(next.startCall(call, headers)) {
            @Override
            public void onMessage(ReqT message) {
                System.out.println("【ServerLoggingInterceptor】Server received message: " + message);
                super.onMessage(message);
            }

            @Override
            public void onHalfClose() {
                System.out.println("【ServerLoggingInterceptor】Server stream half-closed");
                super.onHalfClose();
            }

            @Override
            public void onComplete() {
                System.out.println("【ServerLoggingInterceptor】Server stream completed");
                super.onComplete();
            }

            @Override
            public void onCancel() {
                System.out.println("【ServerLoggingInterceptor】Server stream cancelled");
                super.onCancel();
            }

            @Override
            public void onReady() {
                System.out.println("【ServerLoggingInterceptor】Server stream ready");
                super.onReady();
            }
        };
    }
}
客户端攔截器 1
import io.grpc.*;

public class ClientLoggingInterceptor implements ClientInterceptor {
    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
            MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
        return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
            @Override
            public void start(Listener<RespT> responseListener, Metadata headers) {
                System.out.println("【ClientLoggingInterceptor】Client sending request to method: " + method.getFullMethodName());
                super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
                    @Override
                    public void onMessage(RespT message) {
                        System.out.println("【ClientLoggingInterceptor】Client received message: " + message);
                        super.onMessage(message);
                    }

                    @Override
                    public void onClose(Status status, Metadata trailers) {
                        System.out.println("【ClientLoggingInterceptor】Client call closed with status: " + status);
                        super.onClose(status, trailers);
                    }
                }, headers);
            }

            @Override
            public void sendMessage(ReqT message) {
                System.out.println("【ClientLoggingInterceptor】Client sending message: " + message);
                super.sendMessage(message);
            }
        };
    }
}

3.6. 運行服務和客户端

啓動服務器
  1. 編譯並運行 HelloWorldServermain 方法。
  2. 服務器將在 localhost 的端口 50051 上啓動並開始監聽。
啓動客户端
  1. 編譯並運行 HelloWorldClientmain 方法。
  2. 客户端將連接到服務器並執行以下操作:

    • 使用單一響應調用 SayHello
    • 使用客户端流調用 SayHelloClientStream
    • 使用服務器流調用 SayHelloServerStream
    • 使用雙向流調用 SayHelloBidirectionalStream
使用測試化工具作為客户端

現在很多測試化工具以及支持調試 gRPC 服務了,例如 Apifox,只需要將 .proto 接口描述文件上傳到客户端,就可以自動生成客户端服務。

4. 代碼解釋

4.1. 同步與異步

  • GreeterBlockingStub:用於同步阻塞調用,適合簡單的請求-響應模式,不需要高併發。
  • GreeterStubdd:用於異步非阻塞調用,適合高併發和複雜流式通信。

選擇使用哪種存根類,取決於你的應用需求和併發要求。在需要處理大量併發請求或複雜流式通信的場景下,GreeterStub 更加合適。而對於簡單的請求-響應通信且不關心阻塞的場景,GreeterBlockingStub 則提供了更簡單的編程模型。

4.1.1.GreeterGrpc.GreeterBlockingStub

  • 同步調用

    • GreeterBlockingStub 提供同步的阻塞調用。這意味着當你調用一個 RPC 方法時,調用線程會阻塞,直到服務器返回響應或發生錯誤。
  • 使用場景

    • 適用於簡單的請求-響應模式,客户端可以承受阻塞調用。
    • 適用於需要按順序處理請求,並且不需要高併發的場景。
  • 優點

    • 編程簡單,易於理解和使用。
    • 適合不需要高併發的簡單應用。
  • 缺點

    • 調用是阻塞的,可能會導致資源浪費,尤其是在需要處理大量併發請求時。
  • 示例使用

    ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
                                                  .usePlaintext()
                                                  .build();
    GreeterGrpc.GreeterBlockingStub blockingStub = GreeterGrpc.newBlockingStub(channel);
    
    HelloRequest request = HelloRequest.newBuilder().setName("World").build();
    HelloResponse response = blockingStub.sayHello(request);
    System.out.println("Greeting: " + response.getMessage());

4.1.2.GreeterGrpc.GreeterStub

  • 異步調用

    • GreeterStub 提供異步的非阻塞調用。調用一個 RPC 方法時,方法會立即返回,結果會在回調中處理。
  • 使用場景

    • 適用於需要高併發和低延遲的場景。
    • 適合複雜的流式通信模式(如客户端流服務器流雙向流)。
  • 優點

    • 非阻塞調用,提高了系統的併發能力和資源利用率。
    • 更適合實時性要求高的應用。
  • 缺點

    • 編程相對複雜,需要處理回調或使用異步框架來管理響應。
  • 示例使用

    ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
                                                  .usePlaintext()
                                                  .build();
    GreeterGrpc.GreeterStub asyncStub = GreeterGrpc.newStub(channel);
    
    HelloRequest request = HelloRequest.newBuilder().setName("World").build();
    asyncStub.sayHello(request, new StreamObserver<HelloResponse>() {
        @Override
        public void onNext(HelloResponse response) {
            System.out.println("Greeting: " + response.getMessage());
        }
    
        @Override
        public void onError(Throwable t) {
            System.err.println("Error: " + t);
        }
    
        @Override
        public void onCompleted() {
            System.out.println("Completed");
        }
    });

4.2. onCompleted

在 gRPC 中,當服務器端調用 StreamObserver#onCompleted() 時,客户端會收到通知。這是 gRPC 框架處理流式通信的一部分,用於指示流的正常結束。

  1. 服務器端調用 onCompleted():

    • 當服務器完成了對所有消息的處理併發送完所有需要的響應消息後,服務器端會調用 responseObserver.onCompleted()
    • 這表示服務器端的響應流已經結束,沒有更多的消息會發送到客户端。
  2. 客户端接收 onCompleted() 通知:

    • 客户端的 StreamObserver 實現會收到 onCompleted() 調用的通知。
    • 客户端可以在 onCompleted() 方法中執行一些收尾工作,比如釋放資源或更新狀態。

4.2.1. 調用場景

1. 單一響應(Unary RPC)
  • 客户端和服務器端:在單一響應模式中,onCompleted() 不需要顯式調用。請求和響應都是單個消息,gRPC 框架會自動處理流的結束。
2. 客户端流(Client Streaming RPC)
  • 客户端:需要調用 onCompleted()。客户端在發送完所有請求消息後,應調用 onCompleted() 來通知服務器請求流已結束。服務器在接收到 onCompleted() 後開始處理請求。
  • 服務器端:不需要顯式調用 onCompleted()。服務器端在處理完所有請求後返回單個響應,流會自動結束。
3. 服務器流(Server Streaming RPC)
  • 客户端:不需要調用 onCompleted()。客户端發送單個請求後,等待服務器的響應流。
  • 服務器端:需要調用 onCompleted()。服務器在發送完所有響應消息後,調用 onCompleted() 來通知客户端響應流已結束。
4. 雙向流(Bidirectional Streaming RPC)
  • 客户端:需要調用 onCompleted()。客户端在發送完所有請求消息後,應調用 onCompleted() 來通知服務器請求流已結束。
  • 服務器端:需要調用 onCompleted()。服務器在發送完所有響應消息後,調用 onCompleted() 來通知客户端響應流已結束。
5. 編程規範

當然,在 gRPC 的單一響應模式中,雖然框架會自動處理流的結束,但明確調用 responseObserver.onCompleted() 是一個良好的編程實踐。這麼做有幾個原因:

  1. 明確性和可讀性:即使框架會自動處理調用,明確調用 onCompleted() 可以讓代碼更具可讀性,清晰地表明流在此處結束。這有助於其他開發人員理解代碼邏輯。
  2. 一致性:在所有流模式中顯式調用 onCompleted() 可以保持代碼風格的一致性。這有助於開發人員在處理不同類型的流時使用相同的模式和習慣。
  3. 未來的兼容性:雖然當前版本的 gRPC 可能在某些情況下自動處理 onCompleted(),但顯式調用確保代碼在未來版本中的行為是一致的。
  4. 自定義行為:在某些情況下,你可能需要在調用 onCompleted() 之前執行一些特定的邏輯或清理操作。顯式調用 onCompleted() 讓你有機會在結束流之前插入這些操作。

雖然在單一響應模式下顯式調用 onCompleted() 並不是絕對必要的,但它仍然是一個推薦的做法,尤其是在團隊開發中,有助於代碼的維護和理解。

user avatar sdttttt 頭像 cloudwego 頭像 liangwt 頭像
3 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.