博客 / 詳情

返回

一次RPC請求過程

最近給SRPC項目寫幾篇學習文章,希望協助小夥伴通過這個輕量級的框架快速瞭解RPC相關內容。
本篇為第二篇,注重於解讀一次RPC請求的過程,是最簡單、最主幹的部分,而裏邊每一個層級怎麼做資源調度和複用都不會包括在內,因此有基礎的小夥伴可以直接跳讀源碼解析。

1. RPC概念簡述

SRPC項目地址:https://github.com/sogou/srpc

花一點點時間補充RPC的基本概念,其字面意思是Remote Procedure Call,遠程過程調用。也就是説:

  • 如果我們是客户端,通過RPC調用,把某些事情交給遠程機器去做;
  • 如果我們是服務端,就是被調用,別人交一些事情讓我做;

那麼必然涉及到三個小問題:

  1. 請求是什麼?
  2. 怎麼指定調用哪個過程?
  3. 怎麼給填好回覆?

整個調用由客户端發起。服務端啓動服務器之後,等待他人調用。

我們舉個小例子,上述三個小問題可以這樣:

  1. 請求是int aint b
  2. 指定調用對方的sum
  3. 回覆是求和後的值int ret

一會兒用這個例子看看RPC框架的代碼是怎麼做的。

2. 協議與框架

我們常提到的RPC可能是一種框架:

用來幫我們做網絡收發。

比如SRPC是個輕量級的RPC框架,還有大家熟知的GRPC、Thrift等。

也可能是一種協議

RPC協議讓不同語言、不同框架都可以互通。

個人理解協議的本質是為了生態服務的,RPC承擔的是銜接整個生態系統的橋樑。

兩者關係:

以SRPC為例,支持多種協議,包括SRPCThriftBRPCtRPC,另外還可以收發HTTP協議

我們給出一張RPC請求過程的圖及其中涉及到的關鍵函數接口,然後正式開始下面的學習。

這裏我們看到幾個有意思的事情:

  1. 請求/回覆,是對稱的。
  2. 對客户端Client來説,請求時發出SRPCRequest,收到SRPCResponse;
  3. 對服務端Server來説,收到SRPCRequest,回覆時發出SRPCResponse;
  4. 收/發,接口是對稱的。
  5. 發消息的接口都是encode(),無論我要發的是SRPCRequest還是SRPCResponse;
  6. 收消息的接口都是append(),無論我要收的是SRPCRequest還是SRPCResponse;
  7. Client/Server,也是對稱的。
  8. Client主動發出請求,然後回覆時是被動調起callback()的(哪怕我們用同步接口,那也是調用完代碼再往下走);
  9. Server被動接收請求,然後回覆是process()處理完之後主動進行的。

3. 定義RPC接口

我們剛才三個小問題怎麼定義呢?可以使用protobuf作為接口描述文件:

// [ MyService.proto ]
syntax="proto2";

message Request { // request包含了a和b
    required int32 a = 1;
    required int32 b = 2;
};
                                                                                   
message Response { // response包含了ret
    required int32 ret = 1;
};

service MyService { // 服務名,用來區分我們的服務
    rpc Sum(Request) returns (Response); // 調用名字為Sum的函數
};

也可以配合srpc小工具的api命令,產生一個簡單的protobuf描述文件,並進行修改。命令參考如下:

./srpc api MyService

然後就可以根據提示,打開MyService.proto並編輯其中的接口定義。

4. step-0 : client發出請求

如總圖的step-0,我們想要發出請求,就需要調用上述定義的RPC接口Sum( )

// [ client_main.cc ]
int main()
{                                                    
    MyService::SRPCClient client("127.0.0.1", 1412);                                                                                           
                                                                                                                                      
    Request req;   // 準備好Request
    Response resp; // 準備好Response
    RPCSyncContext ctx; // 一些必要的請求上下文,包括調用狀態碼

    req.set_a(1);  // 填a
    req.set_b(2);  // 填b
    client.Sum(&req, &resp, &ctx); // 調用Sum()
    ...
}

當然想要框架知道怎麼從上述的protobuf文件進行調用,我們需要一些代碼生成工作。這不是本篇的重點,因此這裏僅列出一些命令供大家運行起來。

我們根據剛才srpc小工具的示例,通過改好的proto文件把項目生成出來:

./srpc rpc my_rpc_project -f MyService.proto -p ./

我們打開生成代碼MyService.srpc.h,可以看到剛才調用的Sum()函數的異步接口和同步接口,定義如下:

// [ MyService.srpc.h ]
class SRPCClient : public srpc::SRPCClient                                         
{                                                                                  
public:
    void Sum(const Request *req, SumDone done);
    void Sum(const Request *req, Response *resp, srpc::RPCSyncContext *sync_ctx);
    ...
};

5. step-1:框架為Client發出請求

以上,我們作為RPC的用户,代碼就告一段落了。

接下來交給RPC框架幹活,它要做的事情包括但不僅限於:

把這個請求內容、以及用户要調用哪個服務(service)的哪個函數(method)告訴遠程,並通過網絡發送出去。

我們想要了解一個框架如何工作時,首先要了解它是基於什麼構建起來的,包括什麼語言什麼底層網絡收發庫等。

SRPC是基於Workflow的任務流編程範式開發的,並使用了其攜帶的網絡收發功能,因此我們可以不用手寫I/O多路複用等事情,但是開發需要遵循Workflow的編程規範,即:任務流

我們可以認為對於網絡任務來説,一次會話就是一個task,對於client我們的task職責就在於把Request發給對方收回Response

繼續圍觀生成代碼MyService.srpc.h,我們看一下最簡單的異步接口實現是什麼:

// [ MyService.srpc.h ]
inline void SRPCClient::Sum(const Request *req, SumDone done)                   
{                                                                               
    auto *task = this->create_rpc_client_task("Sum", std::move(done));          
                                                                                
    task->serialize_input(req);                                                 
    task->start();                                                              
} 

內部會構造出一個RPCClientTask,它被task->start();之後,就可以認為請求交給框架,用户態無需再關心,直到回覆時框架通過回調等機制叫醒用户代碼。

由於RPCClientTask的定義比較複雜,我們挑重點看:

// [ rc/rpc_task.inl ]
// 1. 它派生於Workflow的WFComplexClientTask,
//    以REQ,RESP為模版,定義了內部請求與回覆的格式,
//    我們這裏分別是SRPCRequest和SRPCResponse。
template<class RPCREQ, class RPCRESP>                                              
class RPCClientTask : public WFComplexClientTask<RPCREQ, RPCRESP>
{
    ...
protected:
    // 2. SPRC框架重新實現了父類的方法message_out(),
    //    用來告訴Workflow網絡層面這次發出的請求內容時啥
    CommMessageOut *message_out() override;

    // 3. 保存了一個rpc_callback, 讓網絡回覆了之後通知SRPC框架
    //    SRPC框架再去做網絡請求到用户Response的格式轉換
    void rpc_callback(WFNetworkTask<RPCREQ, RPCRESP> *task);
};

上述的RPCREQ就是我們發出的請求,SRPCRequestSRPCResponse都從SRPCMessage派生:

// [ src/message/rpc_message_srpc.h ]
class SRPCRequest : public SRPCMessage
{
    ...
};

class SRPCResponse : public SRPCMessage
{
    ...
};

那麼誰定義了 SRPCMessage的內存結構呢?就是SRPC協議。下圖可以清晰地看到,我們在SRPC協議頭部就有meta部分,上述提到的servicemethod就是填在裏邊。而後面的message就是我們的Request。

我們把消息按照上述結構,通過SRPCMessage::encode()接口填好。這是Workflow的接口,它會在進行網絡發送時entry->session->out->encode()被調用。

// [ src/message/rpc_message_srpc.h ]
inline int SRPCMessage::encode(struct iovec vectors[], int max, size_t size_limit)
{
    // 這裏用上了RPC協議,我們按照協議結構填內容。
}

6. step-2:與操作系統相關的網絡操作

這部分在Workflow中實現,涉及到的網絡基礎知識很多,後續會針對性展開寫學習心得,包括:

  • 命名服務
  • 目標選取
  • 負載均衡
  • 連接管理
  • IO多路複用

等等,現在暫時跳過。

7. server接收請求

我們切換一下視角,來到上述總圖的右半邊,server要接收請求了。

當然server作為一個被動接收者,它需要先被用户啓動起來。以下是用户代碼:

// [ server_main.cc ]
int main()                                                                         
{
    SRPCServer server; // 1. 構造一個server,負責網絡請求

    MyServiceServiceImpl impl; // 2. 構造一個服務,負責實現Sum
    server.add_service(&impl);  // 3. 把服務實現加到server中
                                                           
    if (server.start(1412) == 0)  // 4. 傳入端口,把server跑起來
    {
        printf("my_rpc_project SRPC server started, port %u\n", 1412);
        wait_group.wait(); // 5. server start也是異步的,暫時要卡住主線程不退出
        server.stop();
    }
    else                                                                           
        perror("server start");                                                    
                                
    return 0;                                                                      
} 

然後就可以愉快地按照SRPC協議來接受請求了。

這是誰來做的呢?RPCServer來做的。

8. step-4:框架為server接受請求

// [ src/rpc_server.h ]
// 1. 從Workflow的WFServer派生
//    由RPCTYPE::REQ和RPCTYPE::RESP來指定請求與回覆的類型
template<class RPCTYPE>
class RPCServer : public WFServer<typename RPCTYPE::REQ,
                                  typename RPCTYPE::RESP>
{
...
protected:
    //  2. 需要實現怎麼構造一次會話,即RPCServerTask
    CommSession *new_session(long long seq, CommConnection *conn) override;
    // 3. 調用具體server接口的地方
    void server_process(NETWORKTASK *task) const;
    ...
};

我們的父類WFServer是可以幫我們按照某種協議收網絡包的,只需要:

  1. 我們實現new_session(), new 一個RPCServerTask給它;
  2. 在模版參中指定的RPCTYPE::REQ上實現append()接口,指引Workflow網絡層面如何從操作系統收到的數據上切一份完整的REQ下來。

其中第一步不是必須的,但我們SRPC框架需要,因為我們在本次會話有一些上下文要處理。但本文中我們只需關心REQ。

這個REQ就是SRPCRequest,父類就是SRPCMessage,剛才也有提到過它的encode()實現,現在看看它的append()實現:

// [ src/message/rpc_message_srpc.cc ]
int SRPCMessage::append(const void *buf, size_t *size, size_t size_limit)
{
    ... // 把網絡收到的一批buf,按照RPC協議保存到我的內存裏,
    ... // 並通過返回值告知核心我收發完沒有,因為內部需要維護狀態
}

Workflow會不停調用這個append()來把SRPC協議圖裏的消息收完。

我們這裏通過返回值來告知Workflow的網絡層本條消息的接收情況:

  • 1:消息接受完成;
  • 0:未完成,繼續收;
  • < 0:錯誤;

只要返回1,流程就會繼續往下走,也就是到了process()函數。

9. step-5:調用開發者的rpc函數

SRPC框架收完消息之後,需要對meta進行一些處理:

  • 根據meta裏的service去找到用户剛才server.add_service(impl)時的那個service;
  • 根據meta裏的method去找用户的impl裏實現的函數;

然後就可以調用server端開發者實現的rpc函數了。

查找過程很簡單,以下是簡化的流程:

// [ src/rpc_server.h ]
template<class RPCTYPE>                                                            
void RPCServer<RPCTYPE>::server_process(NETWORKTASK *task) const
{
    // 1. 把SRPC協議中的meta信息反序列化出來
    req->deserialize_meta(); 
    ...
    // 2. 找service對象
    auto *service = this->find_service(req->get_service_name());
    ...
    // 3. 找method對象
    auto *rpc = service->find_method(req->get_method_name());
    ...
    // 4. 進一步處理
    status_code = (*rpc)(server_task->worker);
    ...
}

注意上述的進一步處理是因為,我們還需要對body進行反序列化:

// [ src/rpc_service.h ]
template<class INPUT, class OUTPUT, class SERVICE>                                 
static inline int                                                                  
ServiceRPCCallImpl(SERVICE *service,                                               
                   RPCWorker& worker,                                              
                   void (SERVICE::*rpc)(INPUT *, OUTPUT *, RPCContext *))          
{
    // 1. new一片請求,是一開始定義的包含a和b的Request,它是個ProtobufMessage
    auto *in = new INPUT;

    // 2. 按照網絡包裏的body,從req反序列化處出來到in上
    int status_code = worker.req->deserialize(in);

    // 3. new一片回覆,是一開始定義的包含ret的Response,它也是個ProtobufMessage
    auto *out = new OUTPUT;

    // 4. 調用用户代碼實現的rpc函數,進行計算
    (service->*rpc)(in, out, worker.ctx);
}

之後就可以交給框架做回覆返回的事情了。

10. 對稱的回程

我們最後簡單看一下用户代碼裏一般長啥樣,也就是剛才impl裏的rpc實現:

// [ server_main.cc ]
class MyServiceServiceImpl : public MyService::Service                          
{                                                                               
public:
    void Sum(Request *request, Response *response, srpc::RPCContext *ctx) override
    {
        // 這裏是我們自己實現的加法
        response->set_ret(request->a() + request->b());                                         
    }                                                                           
}; 

之後,用户無需進行任何代碼編寫,SRPC和Workflow會進行step-6和step-7,與先前的步驟類似且對稱地,把回覆填好併發出。

而client端又會先從Workflow和SRPC進行step-8和step-9,同樣與上述步驟類型且對稱地,把回覆收好,並調用到我們的callback,或者在同步接口中(也就是文中的Sum調用示例)填好Response,此次請求就完整結束了。

// [ client_main.cc ]
int main()
{
    ... 
    client.Sum(&req, &resp, &ctx);
                                                             
    if (ctx.success)                                                               
        fprintf(stderr, "ret = %d\n", resp.ret());            
    else                                                                           
        fprintf(stderr, "sync status[%d] error[%d] errmsg:%s\n", ctx.status_code, ctx.error, ctx.errmsg.c_str()); 

    return 0;
}

附上從調用模塊角度來看的one round圖:

更多內容參考:https://github.com/sogou/srpc/blob/master/docs/wiki.md

user avatar lafengdehuanghuacai 頭像 anne-brady 頭像 guhejiahongdoumianbao 頭像 unka_malloc 頭像 wangying_5ea4fb9de961c 頭像 keen_626105e1ef632 頭像
6 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.