博客 / 詳情

返回

手搓RPC框架系列(二):核心功能實現與架構原則應用

文 / Kenyon,資深軟件架構師,15年軟件開發和技術管理經驗,從程序員做到企業技術高管,專注技術管理、架構設計、AI技術應用和落地。

由於公眾號推流的原因,請在關注頁右上角加星標,這樣才能及時收到新文章的推送。

引言

在上一篇文章中,我們基於架構設計原則設計了RPC框架的基礎架構。今天,我們將進入實戰階段,實現RPC框架的核心功能,包括服務代理、序列化、網絡通信等模塊。在實現過程中,我們將重點展示如何將SOLID原則、高內聚低耦合、KISS等架構設計原則應用到實際代碼中。

一、核心組件的實現

1. 序列化模塊(Serializer)

遵循開閉原則,我們設計了Serializer接口,並提供了JSON實現:

// 序列化接口,支持擴展不同的序列化方式
public interface Serializer {
    /**
     * 將對象序列化為字節數組
     *
     * @param obj 要序列化的對象
     * @param <T> 對象類型
     * @return 序列化後的字節數組
     * @throws Exception 序列化異常
     */
    <T> byte[] serialize(T obj) throws Exception;

    /**
     * 將字節數組反序列化為對象
     *
     * @param bytes 序列化後的字節數組
     * @param clazz 對象類型
     * @param <T>   對象類型
     * @return 反序列化後的對象
     * @throws Exception 反序列化異常
     */
    <T> T deserialize(byte[] bytes, Class<T> clazz) throws Exception;

    /**
     * 序列化類型枚舉
     */
    enum Type {
        //目前暫時只是支持JSON,後續可以在這裏添加要支持的其他序列化方式
        JSON(1);

        private final int code;

        Type(int code) {
            this.code = code;
        }

        public int getCode() {
            return code;
        }

        /**
         * 根據code查找對應的序列化類型
         *
         * @param code 序列化類型碼
         * @return 序列化類型
         */
        public static Type findByCode(int code) {
            for (Type type : Type.values()) {
                if (type.code == code) {
                    return type;
                }
            }
            return JSON; // 默認使用JSON
        }
    }
}

// JSON序列化實現
public class JsonSerializer implements Serializer {
    private static final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public <T> byte[] serialize(T obj) throws Exception {
        if (obj == null) {
            return new byte[0];
        }
        return objectMapper.writeValueAsBytes(obj);
    }

    @Override
    public <T> T deserialize(byte[] bytes, Class<T> clazz) throws Exception {
        if (bytes == null || bytes.length == 0) {
            return null;
        }
        return objectMapper.readValue(bytes, clazz);
    }
}

2. 網絡傳輸模塊(Transport)

基於單一職責原則,我們將網絡傳輸模塊拆分為客户端和服務端:

// 網絡傳輸客户端接口
public interface TransportClient {
    void connect(InetSocketAddress address);
    byte[] send(byte[] data) throws Exception;
    void close();
}

// 網絡傳輸服務端接口
public interface TransportServer {
    void start(int port, RequestHandler handler);
    void stop();
    int getPort();
}

// 請求處理器接口
public interface RequestHandler {
    byte[] handle(byte[] request);
}

// 使用Netty實現的傳輸客户端
public class NettyTransportClient implements TransportClient {
    private static final Logger logger = LoggerFactory.getLogger(NettyTransportClient.class);

    private static final int DEFAULT_CONNECT_TIMEOUT = 5000;
    private Channel channel;
    private EventLoopGroup group;
    private ResponseHandler responseHandler;

    @Override
    public void connect(InetSocketAddress address) {
        group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();

        try {
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, DEFAULT_CONNECT_TIMEOUT)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // 處理粘包問題
                            pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4));
                            pipeline.addLast(new LengthFieldPrepender(4));
                            // 字節數組編解碼器
                            pipeline.addLast(new ByteArrayDecoder());
                            pipeline.addLast(new ByteArrayEncoder());
                            // 客户端處理器
                            NettyClientHandler clientHandler = new NettyClientHandler();
                            pipeline.addLast(clientHandler);
                        }
                    });

            // 連接服務端
            ChannelFuture future = bootstrap.connect(address).sync();
            this.channel = future.channel();

            // 初始化響應處理器
            responseHandler = new ResponseHandler();
            // 設置客户端處理器的響應處理器
            ((NettyClientHandler) channel.pipeline().last()).setResponseHandler(responseHandler);
        } catch (Exception e) {
            logger.error("Failed to connect to server: {}", address, e);
            throw new RuntimeException("Failed to connect to server: " + address, e);
        }
    }

    @Override
    public byte[] send(byte[] data) throws Exception {
        if (channel == null || !channel.isActive()) {
            throw new IllegalStateException("Channel is not connected");
        }

        // 發送數據
        channel.writeAndFlush(data).addListener((ChannelFutureListener) future -> {
            if (!future.isSuccess()) {
                Throwable cause = future.cause();
                if (cause instanceof Exception) {
                    responseHandler.setException((Exception) cause);
                } else {
                    responseHandler.setException(new RuntimeException(cause));
                }
            }
        });

        // 等待響應
        return responseHandler.waitForResponse();
    }

    @Override
    public void close() {
        if (channel != null) {
            channel.close();
        }
        if (group != null) {
            group.shutdownGracefully();
        }
    }

    /**
     * 客户端處理器
     */
    private static class NettyClientHandler extends SimpleChannelInboundHandler<byte[]> {
        private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);

        private ResponseHandler responseHandler;

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, byte[] msg) throws Exception {
            if (responseHandler != null) {
                responseHandler.setResponse(msg);
            }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            logger.error("Exception in Netty client handler", cause);
            if (responseHandler != null) {
                if (cause instanceof Exception) {
                    responseHandler.setException((Exception) cause);
                } else {
                    responseHandler.setException(new RuntimeException(cause));
                }
            }
            ctx.close();
        }

        public void setResponseHandler(ResponseHandler responseHandler) {
            this.responseHandler = responseHandler;
        }
    }

    /**
     * 響應處理器
     */
    private static class ResponseHandler {
        private final CountDownLatch latch = new CountDownLatch(1);
        private byte[] response;
        private Exception exception;

        public byte[] waitForResponse() throws Exception {
            if (!latch.await(30, TimeUnit.SECONDS)) {
                throw new RuntimeException("Request timeout");
            }
            if (exception != null) {
                throw exception;
            }
            return response;
        }

        public void setResponse(byte[] response) {
            this.response = response;
            latch.countDown();
        }

        public void setException(Exception exception) {
            this.exception = exception;
            latch.countDown();
        }
    }
}

3. 服務代理模塊(Service Proxy)

使用迪米特法則,代理模塊只與必要的組件通信:

// RPC客户端核心類
public class ServiceProxy implements InvocationHandler {
    private static final Logger logger = LoggerFactory.getLogger(ServiceProxy.class);
    // 服務接口類
    private final Class<?> serviceClass;
    // 服務註冊中心
    private final RegistryCenter registryCenter;
    // 負載均衡策略 
    private final LoadBalance loadBalance;

    /**
     * 構造函數
     *
     * @param serviceClass   服務接口類
     * @param registryCenter 服務註冊中心
     */
    public ServiceProxy(Class<?> serviceClass, RegistryCenter registryCenter) {
        this(serviceClass, registryCenter, new RandomLoadBalance());
    }

    /**
     * 構造函數
     *
     * @param serviceClass   服務接口類
     * @param registryCenter 服務註冊中心
     * @param loadBalance    負載均衡策略
     */
    public ServiceProxy(Class<?> serviceClass, RegistryCenter registryCenter, LoadBalance loadBalance) {
        this.serviceClass = serviceClass;
        this.registryCenter = registryCenter;
        this.loadBalance = loadBalance;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // 創建RPC請求的對象,把調用的服務接口類、方法名、參數類型、參數值等信息封裝到請求對象中
        RpcRequest request = new RpcRequest();
        request.setRequestId(UUID.randomUUID().toString());
        request.setServiceName(serviceClass.getName());
        request.setMethodName(method.getName());
        request.setParameterTypes(method.getParameterTypes());
        request.setParameters(args);

        logger.debug("Sending RPC request: {}, service: {}, method: {}",
                request.getRequestId(), request.getServiceName(), request.getMethodName());

        // 從註冊中心獲取服務地址列表
        List<InetSocketAddress> addresses = registryCenter.discover(serviceClass.getName());
        if (addresses == null || addresses.isEmpty()) {
            throw new RuntimeException("No service available for: " + serviceClass.getName());
        }

        // 使用負載均衡策略選擇服務地址
        InetSocketAddress address = loadBalance.select(serviceClass.getName(), addresses);
        if (address == null) {
            throw new RuntimeException("No service address selected for: " + serviceClass.getName());
        }

        logger.debug("Selected service address: {}", address);

        // 創建客户端併發送請求,這裏暫時使用Netty作為網絡傳輸組件
        TransportClient client = new NettyTransportClient();
        try {
            // 創建序列化器,這裏暫時使用JSON序列化,後續可以添加其他序列化方式,並且改成讀取配置的方式來確定使用哪種序列化方式
            Serializer serializer = new JsonSerializer();

            // 連接到服務端
            client.connect(address);

            // 序列化請求
            byte[] requestData = serializer.serialize(request);

            // 發送請求並獲取響應數據
            byte[] responseData = client.send(requestData);

            // 反序列化響應
            RpcResponse response = serializer.deserialize(responseData, RpcResponse.class);

            if (response.isSuccess()) {
                return response.getResult();
            } else {
                throw new RuntimeException("RPC call failed: " + response.getError());
            }
        } finally {
            client.close();
        }
    }
}

二、架構設計原則的應用總結

在上面代碼實現的過程中,我們分別應用了以下架構設計原則:

1. SOLID原則

  • 比如每個組件都只是負責一個明確的功能,這就很好符合了單一職責原則
  • 然後涉及到後續可能要調整或者擴展到地方我們都是通過面向接口的編程,然後再通過實現接口的方式來實現組件的可擴展,這樣就很好地應用了開閉原則里氏替換原則
  • 在編寫接口的時候,我們也遵循了接口隔離原則里氏替換原則,即每個組件都有自己的接口,而且接口只包含必要的方法,然後組件接口的實現類可以隨時替換父類的實現,而不會影響到程序的正常運行。
  • 最後,我們使用其他模塊的時候,依賴都是依賴接口,然後再通過構造函數的方式來注入具體的實現類,這樣高層模塊就不需要依賴底層模塊,從而做到了依賴倒置原則

2. 通用設計原則

在實現RPC框架的過程中,我們也應用了多個通用設計原則:

  • 我們在代碼的實現過程中非常注重代碼的簡潔,基本都是做最基礎的設計和實現,避免了過度設計,實現的過程中也只實現當前必要的核心功能,確保代碼的可讀性和可維護性,同時也考慮到了代碼的性能和擴展性,這就很好地體現了KISS原則
  • 我們設計和實現的每個組件內部的功能都緊密相關的(高內聚),而組件之間基本都是通過抽象接口來進行通信,減少跟實現模塊或者代碼的直接依賴(低耦合),這樣的設計使得各組件可以獨立演化和維護,這就是高內聚低耦合原則的應用。
  • 在模塊和組件之間,我們遵循"只與直接朋友通信"的原則,組件之間只與直接依賴的組件進行交互,避免形成複雜的依賴鏈,提高系統的穩定性,這就是 迪米特法則的應用。
  • 然後我們通過業務的抽象和代碼複用的機制避免了出現大量代碼重複的情況,例如通過統一的接口定義實現不同組件的複用,降低了維護成本,這個就是DRY原則的應用。

三、總結與下一步計劃

因為篇幅的問題,在這篇文章就先寫這麼多,文章中我們實現了整個RPC框架裏面最核心的組件,包括了序列化模塊、網絡傳輸模塊和服務代理模塊。在實現的過程中,我們重點展示瞭如何將架構設計原則應用到實際代碼中,確保代碼的可擴展性、可維護性和靈活性。

在下一篇文章中,我們將會完成這個RPC框架的剩餘功能,像服務註冊與發現、服務端核心的實現、客户端的負載均衡等模塊,並編寫相關的測試用例來進行完整的測試。同時,也會把項目的代碼一起放上來給大家觀摩和學習。


互動話題:在實現RPC框架的過程中,你認為哪個組件的設計最具挑戰性?為什麼?歡迎在評論區分享你的觀點。

關於作者

Kenyon,資深軟件架構師,15年的軟件開發和技術管理經驗,從程序員做到企業技術高管。多年企業數字化轉型和軟件架構設計經驗,善於幫助企業構建高質量、可維護的軟件系統,目前專注技術管理、架構設計、AI技術應用和落地;全網統一名稱"六邊形架構",歡迎關注交流。

原創不易,轉載請聯繫授權,如果覺得有幫助,請點贊、收藏、轉發三連支持!

快來關注我吧!

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.