动态

详情 返回 返回

深入理解rtmp(二)之C++腳手架搭建 - 动态 详情

前面深入理解rtmp(1)之開發環境搭建中我們已經搭建好服務器,並且利用一些現成的工具可以推送直播流,播放直播流了.這篇文章我們開始搭建從零開發一套rtmp推流拉流sdk,對着協議實現,達到真正的"深入理解".

作為一個碼農,搬磚搬到一定高度就需要"腳手架"來支撐我們"夠得住".為了方面我們把rtmp推拉流sdk實現為一個PC上的命令行程序,當開發調試穩定後,我們可以快速的通過交叉編譯工具編譯到Android/iOS等移動端設備.

1.創建工程

我們使用cmake作為安裝編譯工具,需要安裝cmake,mac下執行brew install cmake.
在我們的rtmpsdk路徑下創建CMakeLists.txt:

//指定cmake最低版本
cmake_minimum_required (VERSION 3.6)

set(CMAKE_INSTALL_PREFIX "${CMAKE_BINARY_DIR}" CACHE PATH "Installation directory" FORCE)
message(STATUS "CMAKE_INSTALL_PREFIX=${CMAKE_INSTALL_PREFIX}")


set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -ffunction-sections -fdata-sections -Os")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fPIC -ffunction-sections -fdata-sections -Os")

project (rtmpsdk)

set(SRC_PREFIX "src")

set(SELF_LIBS_OUT ${CMAKE_SYSTEM_NAME}.out)

file(GLOB SELF_SRC_FILES 
    ${SRC_PREFIX}/main.cpp
    )

add_executable(${PROJECT_NAME} ${SELF_SRC_FILES})

創建src目錄,創建main.cpp文件:

#include <iostream>
int main(int argc,char* argv[])
{
    //標準輸出到控制枱
    std::cout << "Hello rtmp server!" << std::endl;
    return 0;    
}

在rtmpsdk下創建cmake_build文件夾作為我們的輸出路徑
在控制枱,我們進入我們的工程路徑後執行:

cd cmake_build

然後執行:

cmake ..
make 

在camke下面生成了編譯中間文件和最終的rtmpsdk文件:
image.png
現在執行一下./rtmpsdk:

$ ./rtmpsdk 
Hello rtmp server!

可以看到我們打印的"Hello rtmp server!",編譯環境已經搭建好了,可以繼續往下實現我們的功能了.

注:我的開發環境是mac,windows環境後面我提供一個docker的centos鏡像作為我們工程的編譯環境.

2.封裝接口

我們想象一下,我們的rtmp應該對外提供什麼接口?封裝什麼數據結構?

  1. 我們要連接我們的服務器,rtmp是基於tcp,那麼我們要創建一個socket網絡套接字,那麼我們需要一個根據url創建對象的接口rtmp_t rtmp_create(const char* url)
  2. 創建socket後我們還需要做一些配置,最基本的我們要配置讀寫超時時間,如果我們的socket沒有超時,我們的讀寫函數一直沒有返回,會導致無法退出的問題,所以我們需要提供一個設置讀寫超時的接口:int rtmp_set_timeout(rtmp_t rtmp, int recv_timeout_ms, int send_timeout_ms)
  3. rtmp有握手過程,接下來需要一個握手接口:int rtmp_handshake(rtmp_t rtmp)
  4. 握手成功後開始連接服務器,提供連接接口:int rtmp_connect_app(rtmp_t rtmp)
  5. 連接成功後通知服務器是拉流還是推流,提供兩個函數:int rtmp_play_stream(rtmp_t rtmp),int rtmp_publish_stream(rtmp_t rtmp)
  6. 可以開始拉流或推流了:int rtmp_read_packet(rtmp_t rtmp, char* type, uint32_t* timestamp, char** data, int* size),int rtmp_write_packet(rtmp_t rtmp, char type, uint32_t timestamp, char* data, int size)
  7. 拉推流結束後,銷燬對象釋放資源:void rtmp_destroy(rtmp_t rtmp)

以播放為例用一個圖表示:
image.png

接口定義好了,我們在src下新建libs目錄,創建我們對外暴露的rtmpsdk.hpp文件:

#ifndef LIB_RTMP_HPP
#define LIB_RTMP_HPP

/**
 * rtmpsdk is a librtmp like library,
 * used to play/publish rtmp stream from/to rtmp server.
 * socket: use sync and block socket to connect/recv/send data with server.
 * depends: no need other libraries; depends on ssl if use complex_handshake.
 * thread-safe: no
 */

#ifndef __STDC_FORMAT_MACROS
    #define __STDC_FORMAT_MACROS
#endif

#include <stdint.h>
#include <sys/types.h>

#ifdef __cplusplus
extern "C"{
#endif



/*************************************************************
 **************************************************************
 * RTMP protocol context
 **************************************************************
 *************************************************************/
// the RTMP handler.
typedef void* rtmp_t;


/**
 * Create a RTMP handler.
 * @param url The RTMP url, for example, rtmp://localhost/live/livestream
 * @remark default timeout to 30s if not set by rtmp_set_timeout.
 * @remark default schema to url_schema_normal, use rtmp_set_schema to change it.
 *
 * @return a rtmp handler, or NULL if error occured.
 */
extern rtmp_t rtmp_create(const char* url);
/**
 * set socket timeout
 * @param recv_timeout_ms the timeout for receiving messages in ms.
 * @param send_timeout_ms the timeout for sending message in ms.
 * @remark user can set timeout once rtmp_create,
 *      or before rtmp_handshake or rtmp_dns_resolve to connect to server.
 * @remark default timeout to 30s if not set by rtmp_set_timeout.
 *
 * @return 0, success; otherswise, failed.
 */
extern int rtmp_set_timeout(rtmp_t rtmp, int recv_timeout_ms, int send_timeout_ms);
/**
 * close and destroy the rtmp stack.
 * @remark, user should never use the rtmp again.
 */
extern void rtmp_destroy(rtmp_t rtmp);

/*************************************************************
 **************************************************************
 * RTMP protocol stack
 **************************************************************
 *************************************************************/
/**
 * connect and handshake with server
 * category: publish/play
 * previous: rtmp-create
 * next: connect-app
 *
 * @return 0, success; otherswise, failed.
 */
/**
 * simple handshake specifies in rtmp 1.0,
 * not depends on ssl.
 */
/**
 *      rtmp_handshake equals to invoke:
 *       rtmp_dns_resolve()
 *       rtmp_connect_server()
 *       rtmp_do_simple_handshake()
 * user can use these functions if needed.
 */
extern int rtmp_handshake(rtmp_t rtmp);


/**
 * Connect to RTMP tcUrl(Vhost/App), similar to flash AS3 NetConnection.connect(tcUrl).
 * @remark When connected to server, user can retrieve informations from RTMP handler,
 *      for example, use rtmp_get_server_id to get server ip/pid/cid.
 * @return 0, success; otherswise, failed.
 */
extern int rtmp_connect_app(rtmp_t rtmp);


/**
 * play a live/vod stream.
 * category: play
 * previous: connect-app
 * next: destroy
 * @return 0, success; otherwise, failed.
 */
extern int rtmp_play_stream(rtmp_t rtmp);

/**
 * publish a live stream.
 * category: publish
 * previous: connect-app
 * next: destroy
 * @return 0, success; otherwise, failed.
 */
extern int rtmp_publish_stream(rtmp_t rtmp);


/**
 * E.4.1 FLV Tag, page 75
 */
// 8 = audio
#define RTMP_TYPE_AUDIO 8
// 9 = video
#define RTMP_TYPE_VIDEO 9
// 18 = script data
#define RTMP_TYPE_SCRIPT 18
/**
 * read a audio/video/script-data packet from rtmp stream.
 * @param type, output the packet type, macros:
 *            RTMP_TYPE_AUDIO, FlvTagAudio
 *            RTMP_TYPE_VIDEO, FlvTagVideo
 *            RTMP_TYPE_SCRIPT, FlvTagScript
 *            otherswise, invalid type.
 * @param timestamp, in ms, overflow in 50days
 * @param data, the packet data, according to type:
 *             FlvTagAudio, @see "E.4.2.1 AUDIODATA"
 *            FlvTagVideo, @see "E.4.3.1 VIDEODATA"
 *            FlvTagScript, @see "E.4.4.1 SCRIPTDATA"
 *            User can free the packet by rtmp_free_packet.
 * @param size, size of packet.
 * @return the error code. 0 for success; otherwise, error.
 *
 * @remark: for read, user must free the data.
 * @remark: for write, user should never free the data, even if error.
 *
 * @return 0, success; otherswise, failed.
 */
extern int rtmp_read_packet(rtmp_t rtmp, char* type, uint32_t* timestamp, char** data, int* size);
// @param data User should never free it anymore.
extern int rtmp_write_packet(rtmp_t rtmp, char type, uint32_t timestamp, char* data, int size);


#ifdef __cplusplus
}
#endif

#endif

接口定義好後,我們開始按步驟實現接口,下面我們開始實現第一步rtmp_create,通過url創建socket.

3.封裝網絡接口

封裝網絡接口前,我們先對linux c網絡編程做一個回顧

3.1linux c socket編程基本流程

我們先來一張圖:
image.png

我們的rtmpsdk作為 tcp客户端,我們再一起了解一下linux c關於socket的api

3.1.1 socket()

函數原型
int socket(int domain, int type, int protocol);
參數説明
  • domain: 協議域,又稱協議族(family)。常用的協議族有 AF_INET 、 AF_INET6 、 AF_LOCAL(或稱AF_UNIX,Unix域Socket)、AF_ROUTE 等。協議族決定了 socket 的地址類型,在通信中必須採用對應的地址,如 AF_INET 決定了要用 ipv4 地址(32位的)與端口號(16位的)的組合、AF_UNIX 決定了要用一個絕對路徑名作為地址。
  • type: 指定 Socket 類型。常用的 socket 類型有 SOCK_STREAM 、 SOCK_DGRAM 、 SOCK_RAW 、 SOCK_PACKET 、 SOCK_SEQPACKET 等。流式 Socket(SOCK_STREAM)是一種面向連接的 Socket,針對於面向連接的 TCP 服務應用。數據報式 Socket(SOCK_DGRAM)是一種無連接的 Socket,對應於無連接的 UDP 服務應用。
  • protocol: 指定協議。常用協議有 IPPROTO_TCP 、 IPPROTO_UDP 、 IPPROTO_STCP 、 IPPROTO_TIPC 等,分別對應 TCP 傳輸協議、UDP 傳輸協議、STCP 傳輸協議、TIPC 傳輸協議。

注意:1.type 和 protocol 不可以隨意組合,如 SOCK_STREAM 不可以跟 IPPROTO_UDP 組合。當第三個參數為0時,會自動選擇第二個參數類型對應的默認協議。

返回值

如果調用成功就返回新創建的套接字的描述符,如果失敗就返回 INVALID_SOCKET(Linux下失敗返回-1)。

套接字描述符是一個整數類型的值。每個進程的進程空間裏都有一個套接字描述符表,該表中存放着套接字描述符和套接字數據結構的對應關係。該表中有一個字段存放新創建的套接字的描述符,另一個字段存放套接字數據結構的地址,因此根據套接字描述符就可以找到其對應的套接字數據結構。每個進程在自己的進程空間裏都有一個套接字描述符表但是套接字數據結構都是在操作系統的內核緩衝裏。

3.1.2 bind()

bind()函數把一個地址族中的特定地址賦給socket。例如對應 AF_INET、AF_INET6 就是把一個 ipv4 或 ipv6 地址和端口號組合賦給socket。

函數原型
int bind(int socketfd, const struct sockaddr *addr, socklen_t addrlen);
參數説明
  • socketfd: 一個標識已連接套接口的描述字。
  • address: 是一個sockaddr結構指針,該結構中包含了要結合的地址和端口號。
  • address_len: 確定 address 緩衝區的長度。

其中,sockaddr 這個地址結構根據地址創建 socket 時的地址協議族的不同而不同。

如ipv4對應的是:

struct sockaddr_in {
    sa_family_t    sin_family; /* address family: AF_INET */
    in_port_t      sin_port;   /* port in network byte order */
    struct in_addr sin_addr;   /* internet address */
};
/* Internet address. */
struct in_addr {
    uint32_t       s_addr;     /* address in network byte order */
};

ipv6對應的是:

struct sockaddr_in6 { 
    sa_family_t     sin6_family;   /* AF_INET6 */ 
    in_port_t       sin6_port;     /* port number */ 
    uint32_t        sin6_flowinfo; /* IPv6 flow information */ 
    struct in6_addr sin6_addr;     /* IPv6 address */ 
    uint32_t        sin6_scope_id; /* Scope ID (new in 2.4) */ 
};

struct in6_addr { 
    unsigned char   s6_addr[16];   /* IPv6 address */ 
};

Unix域對應的是:

#define UNIX_PATH_MAX    108
struct sockaddr_un { 
    sa_family_t sun_family;               /* AF_UNIX */ 
    char        sun_path[UNIX_PATH_MAX];  /* pathname */ 
};
返回值

如果函數執行成功,返回值為0,否則為SOCKET_ERROR。

3.1.3listen()

如果作為一個服務器,在調用socket()、bind()之後就會調用listen()來監聽這個socket,如果客户端這時調用connect()發出連接請求,服務器端就會接收到這個請求。

函數原型
int listen(int socketfd, int backlog);
參數説明
  • socketfd: 要監聽的socket的描述字。
  • backlog: 相應socket可以排隊的最大連接個數。

socket()函數創建的socket默認是一個主動類型的,listen函數將socket變為被動類型的,等待客户的連接請求。

3.1.4connect()

函數原型
int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
參數説明
  • socketfd: 客户端socket的描述字。
  • sockaddr: 服務器的socket地址。
  • addrlen: socket地址的長度

3.1.5. accept()

TCP服務器端依次調用 socket()、bind()、listen() 之後,就會監聽指定的 socket 地址了。TCP客户端依次調用 socket()、connect() 之後就向 TCP 服務器發送了一個連接請求。TCP 服務器監聽到這個請求之後,就會調用 accept() 函數取接收請求,這樣連接就建立好了。之後就可以開始網絡I/O操作了,即類同於普通文件的讀寫I/O操作。

函數原型
int accept(int socketfd, struct sockaddr *addr, socklen_t *addrlen); //返回連接connect_fd
參數説明
  • socketfd: 就是上面解釋中的監聽套接字,這個套接字用來監聽一個端口,當有一個客户與服務器連接時,它使用這個一個端口號,而此時這個端口號正與這個套接字關聯。當然客户不知道套接字這些細節,它只知道一個地址和一個端口號。
  • sockaddr: 結果參數,它用來接受一個返回值,這返回值指定客户端的地址,當然這個地址是通過某個地址結構來描述的,用户應該知道這一個什麼樣的地址結構。如果對客户的地址不感興趣,那麼可以把這個值設置為NULL。
  • len: 它也是結果的參數,用來接受上述 addr 的結構的大小的,它指明 addr 結構所佔有的字節個數。同樣的,它也可以被設置為NULL。

如果accept成功返回,則服務器與客户已經正確建立連接了,此時服務器通過accept返回的套接字來完成與客户的通信。

accept默認會阻塞進程,直到有一個客户連接建立後返回,它返回的是一個新可用的套接字,這個套接字是連接套接字。

  • 監聽套接字: 監聽套接字正如accept的參數sockfd,它是監聽套接字,在調用listen函數之後,是服務器開始調用socket()函數生成的,稱為監聽socket描述字(監聽套接字)
  • 連接套接字:一個套接字會從主動連接的套接字變身為一個監聽套接字;而accept函數返回的是已連接socket描述字(一個連接套接字),它代表着一個網絡已經存在的點點連接。

一個服務器通常通常僅僅只創建一個監聽socket描述字,它在該服務器的生命週期內一直存在。內核為每個由服務器進程接受的客户連接創建了一個已連接socket描述字,當服務器完成了對某個客户的服務,相應的已連接socket描述字就被關閉。

連接套接字socketfd_new 並沒有佔用新的端口與客户端通信,依然使用的是與監聽套接字socketfd一樣的端口號

3.1.6. read()、write()等

當服務器與客户端已經建立好連接,可以調用網絡I/O進行讀寫操作了,即實現了網咯中不同進程之間的通信!網絡I/O操作有下面幾組:

read()/write()
recv()/send()
readv()/writev()
recvmsg()/sendmsg()
recvfrom()/sendto()
函數原型1
int recv(SOCKET socket, char FAR* buf, int len, int flags);
參數説明1
  • socket: 一個標識已連接套接口的描述字。
  • buf: 用於接收數據的緩衝區。
  • len: 緩衝區長度。
  • flags: 指定調用方式。取值:MSG_PEEK 查看當前數據,數據將被複制到緩衝區中,但並不從輸入隊列中刪除;MSG_OOB 處理帶外數據。
    若無錯誤發生,recv()返回讀入的字節數。如果連接已中止,返回0。否則的話,返回SOCKET_ERROR錯誤,應用程序可通過WSAGetLastError()獲取相應錯誤代碼。
函數原型2
ssize_t recvfrom(int sockfd, void buf, int len, unsigned int flags, struct socketaddr* from, socket_t* fromlen);
參數説明2
  • sockfd: 標識一個已連接套接口的描述字。
  • buf: 接收數據緩衝區。
  • len: 緩衝區長度。
  • flags: 調用操作方式。是以下一個或者多個標誌的組合體,可通過or操作連在一起:
  • MSG_DONTWAIT:操作不會被阻塞;
  • MSG_ERRQUEUE: 指示應該從套接字的錯誤隊列上接收錯誤值,依據不同的協議,錯誤值以某種輔佐性消息的方式傳遞進來,使用者應該提供足夠大的緩衝區。導致錯誤的原封包通過msg_iovec作為一般的數據來傳遞。導致錯誤的數據報原目標地址作為msg_name被提供。錯誤以sock_extended_err結構形態被使用。
  • MSG_PEEK:指示數據接收後,在接收隊列中保留原數據,不將其刪除,隨後的讀操作還可以接收相同的數據。
  • MSG_TRUNC:返回封包的實際長度,即使它比所提供的緩衝區更長, 只對packet套接字有效。
  • MSG_WAITALL:要求阻塞操作,直到請求得到完整的滿足。然而,如果捕捉到信號,錯誤或者連接斷開發生,或者下次被接收的數據類型不同,仍會返回少於請求量的數據。
  • MSG_EOR:指示記錄的結束,返回的數據完成一個記錄。
  • MSG_TRUNC:指明數據報尾部數據已被丟棄,因為它比所提供的緩衝區需要更多的空間。
  • MSG_CTRUNC:指明由於緩衝區空間不足,一些控制數據已被丟棄。(MSG_TRUNC使用錯誤,4才是MSG_TRUNC的正確解釋)
  • MSG_OOB:指示接收到out-of-band數據(即需要優先處理的數據)。
  • MSG_ERRQUEUE:指示除了來自套接字錯誤隊列的錯誤外,沒有接收到其它數據。
  • from:(可選)指針,指向裝有源地址的緩衝區。
  • fromlen:(可選)指針,指向from緩衝區長度值。
函數原型3
int sendto( SOCKET s, const char FAR* buf, int size, int flags, const struct sockaddr FAR* to, int tolen);
參數説明3
  • s: 套接字
  • buf: 待發送數據的緩衝區
  • size: 緩衝區長度
  • flags: 調用方式標誌位, 一般為0, 改變Flags,將會改變Sendto發送的形式
  • addr: (可選)指針,指向目的套接字的地址
  • tolen: addr所指地址的長度
    如果成功,則返回發送的字節數,失敗則返回SOCKET_ERROR。
函數原型4
int accept( int fd, struct socketaddr* addr, socklen_t* len);
參數説明4
  • fd: 套接字描述符。
  • addr: 返回連接着的地址
  • len: 接收返回地址的緩衝區長度
    成功返回客户端的文件描述符,失敗返回-1。

3.1.7. close()

在服務器與客户端建立連接之後,會進行一些讀寫操作,完成了讀寫操作就要關閉相應的socket描述字。

函數原型
int close(int fd);

close一個TCP socket的缺省行為時把該socket標記為以關閉,然後立即返回到調用進程。該描述字不能再由調用進程使用,也就是説不能再作為read或write的第一個參數。

注意:close操作只是使相應socket描述字的引用計數-1,只有當引用計數為0的時候,才會觸發TCP客户端向服務器發送終止連接請求。

3.2封裝socket

我們把socket和超時配置等封裝到一個結構體:

struct BlockSyncSocket
{
    SOCKET fd;
    int    family;
    int64_t rbytes;
    int64_t sbytes;
    // The send/recv timeout in ms.
    int64_t rtm;
    int64_t stm;

    BlockSyncSocket() {
        stm = rtm = UTIME_NO_TIMEOUT;
        rbytes = sbytes = 0;

        SOCKET_RESET(fd);
        SOCKET_SETUP();
    }

    virtual ~BlockSyncSocket() {
        SOCKET_CLOSE(fd);
        SOCKET_CLEANUP();
    }
};

通過上面分析知,我們需要設計socket創建,連接,讀寫,設置超時等:

/**
 * simple socket stream,
 * use tcp socket, sync block mode
 */
class SimpleSocketStream
{
private:
    BlockSyncSocket* io;
public:
    SimpleSocketStream();
    virtual ~SimpleSocketStream();
public:
    virtual BlockSyncSocket* hijack_io();
    virtual int create_socket(std::string url);
    virtual int connect(const char* server, int port);

public:
    virtual error_t read(void* buf, size_t size, ssize_t* nread);

public:
    virtual void set_recv_timeout(utime_t tm);
    virtual utime_t get_recv_timeout();
    virtual int64_t get_recv_bytes();
public:
    virtual void set_send_timeout(utime_t tm);
    virtual utime_t get_send_timeout();
    virtual int64_t get_send_bytes();
    virtual error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
public:
    virtual error_t read_fully(void* buf, size_t size, ssize_t* nread);
    virtual error_t write(void* buf, size_t size, ssize_t* nwrite);
};

接下來我們實現網絡封裝接口:

#include <netinet/tcp.h>

#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/uio.h>


#include <sys/types.h>
#include <errno.h>
#include <stdio.h>
#include <netdb.h>

#include <bs_socket.hpp>




BlockSyncSocket* hijack_io_create()
{
    BlockSyncSocket* skt = new BlockSyncSocket();
    return skt;
}
void hijack_io_destroy(BlockSyncSocket* ctx)
{
    freep(ctx);
}
int hijack_io_create_socket(BlockSyncSocket* skt,std::string url)
{
    skt->family = AF_INET6;
    skt->fd = ::socket(skt->family, SOCK_STREAM, 0);   // Try IPv6 first.
    if (!SOCKET_VALID(skt->fd)) {
        skt->family = AF_INET;
        skt->fd = ::socket(skt->family, SOCK_STREAM, 0);   // Try IPv4 instead, if IPv6 fails.
    }
    if (!SOCKET_VALID(skt->fd)) {
        return ERROR_SOCKET_CREATE;
    }

    // No TCP cache.
    int v = 1;
    setsockopt(skt->fd, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v));

    return ERROR_SUCCESS;
}
int hijack_io_connect(BlockSyncSocket* skt, const char* server_ip, int port)
{
    char sport[8];
    snprintf(sport, sizeof(sport), "%d", port);

    addrinfo hints;
    memset(&hints, 0, sizeof(hints));
    hints.ai_family   = skt->family;
    hints.ai_socktype = SOCK_STREAM;

    addrinfo* r  = NULL;
    AutoFree(addrinfo, r);
    if(getaddrinfo(server_ip, sport, (const addrinfo*)&hints, &r)) {
        return ERROR_SOCKET_CONNECT;
    }

    if(::connect(skt->fd, r->ai_addr, r->ai_addrlen) < 0){
        return ERROR_SOCKET_CONNECT;
    }

    return ERROR_SUCCESS;
}
int hijack_io_read(BlockSyncSocket* skt, void* buf, size_t size, ssize_t* nread)
{
    int ret = ERROR_SUCCESS;

    ssize_t nb_read = ::recv(skt->fd, (char*)buf, size, 0);

    if (nread) {
        *nread = nb_read;
    }

    // On success a non-negative integer indicating the number of bytes actually read is returned
    // (a value of 0 means the network connection is closed or end of file is reached).
    if (nb_read <= 0) {
        if (nb_read < 0 && SOCKET_ERRNO() == SOCKET_ETIME) {
            return ERROR_SOCKET_TIMEOUT;
        }

        if (nb_read == 0) {
            errno = SOCKET_ECONNRESET;
        }

        return ERROR_SOCKET_READ;
    }

    skt->rbytes += nb_read;

    return ret;
}
int hijack_io_set_recv_timeout(BlockSyncSocket* skt, int64_t tm)
{

    // The default for this option is zero,
    // which indicates that a receive operation shall not time out.
    int32_t sec = 0;
    int32_t usec = 0;

    if (tm != UTIME_NO_TIMEOUT) {
        sec = (int32_t)(tm / 1000);
        usec = (int32_t)((tm % 1000)*1000);
    }

    struct timeval tv = { sec , usec };
    if (setsockopt(skt->fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) {
        return SOCKET_ERRNO();
    }

    skt->rtm = tm;

    return ERROR_SUCCESS;
}

int hijack_io_set_send_timeout(BlockSyncSocket* skt, int64_t tm)
{

    // The default for this option is zero,
    // which indicates that a receive operation shall not time out.
    int32_t sec = 0;
    int32_t usec = 0;

    if (tm != UTIME_NO_TIMEOUT) {
        sec = (int32_t)(tm / 1000);
        usec = (int32_t)((tm % 1000)*1000);
    }

    struct timeval tv = { sec , usec };
    if (setsockopt(skt->fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) == -1) {
        return SOCKET_ERRNO();
    }

    skt->stm = tm;

    return ERROR_SUCCESS;
}

int hijack_io_writev(BlockSyncSocket* skt, const iovec *iov, int iov_size, ssize_t* nwrite)
{

    int ret = ERROR_SUCCESS;

    ssize_t nb_write = ::writev(skt->fd, iov, iov_size);

    if (nwrite) {
        *nwrite = nb_write;
    }

    // On  success,  the  readv()  function  returns the number of bytes read;
    // the writev() function returns the number of bytes written.  On error, -1 is
    // returned, and errno is set appropriately.
    if (nb_write <= 0) {
        if (nb_write < 0 && SOCKET_ERRNO() == SOCKET_ETIME) {
            return ERROR_SOCKET_TIMEOUT;
        }

        return ERROR_SOCKET_WRITE;
    }

    skt->sbytes += nb_write;

    return ret;
}

int hijack_io_read_fully(BlockSyncSocket* skt, void* buf, size_t size, ssize_t* nread)
{
    int ret = ERROR_SUCCESS;

    size_t left = size;
    ssize_t nb_read = 0;

    while (left > 0) {
        char* this_buf = (char*)buf + nb_read;
        ssize_t this_nread;

        if ((ret = hijack_io_read(skt, this_buf, left, &this_nread)) != ERROR_SUCCESS) {
            return ret;
        }

        nb_read += this_nread;
        left -= (size_t)this_nread;
    }

    if (nread) {
        *nread = nb_read;
    }
    skt->rbytes += nb_read;

    return ret;
}
int hijack_io_write(BlockSyncSocket* skt, void* buf, size_t size, ssize_t* nwrite)
{

    int ret = ERROR_SUCCESS;

    ssize_t nb_write = ::send(skt->fd, (char*)buf, size, 0);

    if (nwrite) {
        *nwrite = nb_write;
    }

    if (nb_write <= 0) {
        if (nb_write < 0 && SOCKET_ERRNO() == SOCKET_ETIME) {
            return ERROR_SOCKET_TIMEOUT;
        }

        return ERROR_SOCKET_WRITE;
    }

    skt->sbytes += nb_write;

    return ret;
}


error_t SimpleSocketStream::read(void* buf, size_t size, ssize_t* nread)
{
    assert(io);
    int ret = hijack_io_read(io, buf, size, nread);
    if (ret != ERROR_SUCCESS) {
        return error_new(ret, "read");
    }
    return success;
}

接下來我們就可以在我們的main函數裏面創建SimpleSocketStream,然後創建socket了.下一篇我們開始通過創建的socket進行rtmp握手.

3.3測試

在我們的main.cpp中:

#include <iostream>
#include <bs_socket.hpp>
int main(int argc,char* argv[])
{
    std::cout << "Hello rtmp server!" << std::endl;
    SimpleSocketStream *sss = new SimpleSocketStream();
    if(sss->create_socket("rtmp://127.0.0.1:1935/live/livestream") != 0){
        printf("create socket error!");
        return -1;
    }
    std::cout<< "create fd = " << sss->hijack_io()->fd << std::endl;
    free(sss);
    return 0;    
}

輸出結果:

$ ./rtmpsdk 
Hello rtmp server!
create fd = 3

我們成功創建了句柄為3的socket.

題外話

linux網絡編程中有同步/異步,阻塞/非阻塞,由於我們現在sdk是客户端,沒有併發連接的問題,所以我們的實現使用阻塞同步socket.
我們在創建socket時兼容了ipv6,先嚐試ipv6,如果失敗了再嘗試ipv4:

int hijack_io_create_socket(BlockSyncSocket* skt,std::string url)
{
    skt->family = AF_INET6;
    skt->fd = ::socket(skt->family, SOCK_STREAM, 0);   // Try IPv6 first.
    if (!SOCKET_VALID(skt->fd)) {
        skt->family = AF_INET;
        skt->fd = ::socket(skt->family, SOCK_STREAM, 0);   // Try IPv4 instead, if IPv6 fails.
    }
    if (!SOCKET_VALID(skt->fd)) {
        return ERROR_SOCKET_CREATE;
    }

    // No TCP cache.
    int v = 1;
    setsockopt(skt->fd, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v));

    return ERROR_SUCCESS;
}

setsockopt可以對socket進行設置,這裏:IPPROTO_TCP 和 IPPROTO_IP代表兩種不同的協議,分別代表IP協議族裏面的TCP協議和IP協議
TCP_NODELAY是什麼呢?
TCP/IP協議中針對TCP默認開啓了Nagle算法。Nagle算法通過減少需要傳輸的數據包,來優化網絡。在內核實現中,數據包的發送和接受會先做緩存,分別對應於寫緩存和讀緩存。

啓動TCP_NODELAY,就意味着禁用了Nagle算法,允許小包的發送。對於延時敏感型,同時數據傳輸量比較小的應用,開啓TCP_NODELAY選項無疑是一個正確的選擇。rtmp是直播流式傳輸,對延時比較敏感,所以我們關閉了NODELAY.同時比如,對於SSH會話,用户在遠程敲擊鍵盤發出指令的速度相對於網絡帶寬能力來説,絕對不是在一個量級上的,所以數據傳輸非常少;而又要求用户的輸入能夠及時獲得返回,有較低的延時。如果開啓了Nagle算法,就很可能出現頻繁的延時,導致用户體驗極差。當然,你也可以選擇在應用層進行buffer,比如使用java中的buffered stream,儘可能地將大包寫入到內核的寫緩存進行發送;vectored I/O(writev接口)也是個不錯的選擇。

對於關閉TCP_NODELAY,則是應用了Nagle算法。數據只有在寫緩存中累積到一定量之後,才會被髮送出去,這樣明顯提高了網絡利用率(實際傳輸數據payload與協議頭的比例大大提高)。但是這又不可避免地增加了延時;與TCP delayed ack這個特性結合,這個問題會更加顯著,延時基本在40ms左右。當然這個問題只有在連續進行兩次寫操作的時候,才會暴露出來。

連續進行多次對小數據包的寫操作,然後進行讀操作,本身就不是一個好的網絡編程模式;在應用層就應該進行優化。
對於既要求低延時,又有大量小數據傳輸,還同時想提高網絡利用率的應用,大概只能用UDP自己在應用層來實現可靠性保證了。

user avatar videocloud 头像 pannideshoutao 头像 yushang_66b0e8718bd85 头像 ZhongQianwen 头像 mamaster777 头像 CH3CH2OH-Blog 头像 zdyz 头像 verd 头像 aerfazhe 头像 xvrzhao 头像 womaxuanhuang 头像 shiluodexiaomaju 头像
点赞 13 用户, 点赞了这篇动态!
点赞

Add a new 评论

Some HTML is okay.