博客 / 詳情

返回

徹底學會使用epoll(二)——ET的讀寫操作實例分析

相關視頻推薦
面試中正經“八股文”網絡原理tcp/udp,網絡編程epoll/reactor
epoll 原理剖析 以及 reactor 模型應用
epoll原理剖析以及三握四揮的處理
LinuxC++後台服務器開發架構師免費學習地址
徹底學會使用epoll(一)——ET模式實現分析
接上一篇
首先,看程序四的例子。
l 程序四

#include <unistd.h>
#include <iostream>
#include <sys/epoll.h>
using namespace std;
int main(void)
{
    int epfd,nfds;
    struct epoll_event ev,events[5];//ev用於註冊事件,數組用於返回要處理的事件
    epfd=epoll_create(1);//只需要監聽一個描述符——標準輸出
    ev.data.fd=STDOUT_FILENO;
    ev.events=EPOLLOUT|EPOLLET;//監聽讀狀態同時設置ET模式
    epoll_ctl(epfd,EPOLL_CTL_ADD,STDOUT_FILENO,&ev);//註冊epoll事件
    for(;;)
   {
      nfds=epoll_wait(epfd,events,5,-1);
      for(int i=0;i<nfds;i++)
     {
         if(events[i].data.fd==STDOUT_FILENO)
             cout<<"hello world!"<<endl;
     }
   }
};

這個程序的功能是隻要標準輸出寫就緒,就輸出“hello world!”。
運行結果:
image.png

我們發現這將是一個死循環。下面具體分析一下這個程序的執行過程:
(1) 首先初始buffer為空,buffer中有空間可寫,這時無論是ET還是LT都會將對應的epitem加入rdlist(對應第一節圖中的紅線),導致epoll_wait就返回寫就緒。
(2) 程序想標準輸出輸出”hello world!”和換行符,因為標準輸出為控制枱的時候緩衝是“行緩衝”,所以換行符導致buffer中的內容清空,這就對應第二節中ET模式下寫就緒的第二種情況——當有舊數據被髮送走時,即buffer中待寫的內容變少得時候會觸發fd狀態的改變。所以下次epoll_wait會返回寫就緒。之後重複這個過程一直循環下去。
我們再看程序五。
程序五
相對程序四這裏僅僅去掉了輸出的換行操作。即:
cout<<"hello world!";
運行結果如下:

image.png

我們看到程序成掛起狀態。因為第一次epoll_wait返回寫就緒後,程序向標準輸出的buffer中寫入“hello world!”,但是因為沒有輸出換行,所以buffer中的內容一直存在,下次epoll_wait的時候,雖然有寫空間但是ET模式下不再返回寫就緒。回憶第一節關於ET的實現,這種情況原因就是第一次buffer為空,導致epitem加入rdlist,返回一次就緒後移除此epitem,之後雖然buffer仍然可寫,但是由於對應epitem已經不再rdlist中,就不會對其就緒fd的events的在檢測了。
程序六
int main(void)
{

int epfd,nfds;
struct epoll_event ev,events[5];//ev用於註冊事件,數組用於返回要處理的事件
epfd=epoll_create(1);//只需要監聽一個描述符——標準輸出
ev.data.fd=STDOUT_FILENO;
ev.events=EPOLLOUT;//使用默認LT模式
epoll_ctl(epfd,EPOLL_CTL_ADD,STDOUT_FILENO,&ev);//註冊epoll事件
for(;;)

{

 nfds=epoll_wait(epfd,events,5,-1);
 for(int i=0;i<nfds;i++)
{
  if(events[i].data.fd==STDOUT_FILENO)
     cout<<"hello world!";
}

}
};

image.png

程序六相對程序五僅僅是修改ET模式為默認的LT模式,我們發現程序再次死循環。這時候原因已經很清楚了,因為當向buffer寫入”hello world!”後,雖然buffer沒有輸出清空,但是LT模式下只有buffer有寫空間就返回寫就緒,所以會一直輸出”hello world!”,當buffer滿的時候,buffer會自動刷清輸出,同樣會造成epoll_wait返回寫就緒。
程序七
int main(void)

{

int epfd,nfds;

struct epoll_event ev,events[5];//ev用於註冊事件,數組用於返回要處理的事件

epfd=epoll_create(1);//只需要監聽一個描述符——標準輸出

ev.data.fd=STDOUT_FILENO;

ev.events=EPOLLOUT|EPOLLET;//監聽讀狀態同時設置ET模式

epoll_ctl(epfd,EPOLL_CTL_ADD,STDOUT_FILENO,&ev);//註冊epoll事件

for(;;)

{

 nfds=epoll_wait(epfd,events,5,-1);

 for(int i=0;i<nfds;i++)

{

   if(events[i].data.fd==STDOUT_FILENO)

       cout<<"hello world!";

   ev.data.fd=STDOUT_FILENO;

   ev.events=EPOLLOUT|EPOLLET;

   epoll_ctl(epfd,EPOLL_CTL_MOD,STDOUT_FILENO,&ev); //重新MOD事件(ADD無效)

}

}

};
image.png

程序七相對於程序五在每次向標準輸出的buffer輸出”hello world!”後,重新MOD OUT事件。所以相當於每次重新進行第一節中紅線描述的途徑返回就緒,導致程序循環輸出。
ET模式下的讀寫
經過前面幾節分析,我們可以知道,當epoll工作在ET模式下時,對於讀操作,如果read一次沒有讀盡buffer中的數據,那麼下次將得不到讀就緒的通知,造成buffer中已有的數據無機會讀出,除非有新的數據再次到達。對於寫操作,主要是因為ET模式下fd通常為非阻塞造成的一個問題——如何保證將用户要求寫的數據寫完。
要解決上述兩個ET模式下的讀寫問題,我們必須實現:
a. 對於讀,只要buffer中還有數據就一直讀;
b. 對於寫,只要buffer還有空間且用户請求寫的數據還未寫完,就一直寫。
要實現上述a、b兩個效果,我們有兩種方法解決。
方法一
(1) 每次讀入操作後(read,recv),用户主動epoll_mod IN事件,此時只要該fd的緩衝還有數據可以讀,則epoll_wait會返回讀就緒。
(2) 每次輸出操作後(write,send),用户主動epoll_mod OUT事件,此時只要該該fd的緩衝可以發送數據(發送buffer不滿),則epoll_wait就會返回寫就緒(有時候採用該機制通知epoll_wai醒過來)。
這個方法的原理我們在之前討論過:當buffer中有數據可讀(即buffer不空)且用户對相應fd進行epoll_mod IN事件時ET模式返回讀就緒,當buffer中有可寫空間(即buffer不滿)且用户對相應fd進行epoll_mod OUT事件時返回寫就緒。
所以得到如下解決方式:
if(events[i].events&EPOLLIN)//如果收到數據,那麼進行讀入

{

cout << "EPOLLIN" << endl;

sockfd = events[i].data.fd;

if ( (n = read(sockfd, line, MAXLINE))>0) 

{

line[n] = '/0';

    cout << "read " << line << endl;

if(n==MAXLINE)

{

ev.data.fd=sockfd;

ev.events=EPOLLIN|EPOLLET;

epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //數據還沒讀完,重新MOD IN事件

}

else

{

ev.data.fd=sockfd;

ev.events=EPOLLIN|EPOLLET;

epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //buffer中的數據已經讀取完畢MOD OUT事件

}

}

else if (n == 0)

{

close(sockfd);

}


}

else if(events[i].events&EPOLLOUT) // 如果有數據發送

{

sockfd = events[i].data.fd;

write(sockfd, line, n);

ev.data.fd=sockfd; //設置用於讀操作的文件描述符

ev.events=EPOLLIN|EPOLLET; //設置用於注測的讀操作事件

epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);  //修改sockfd上要處理的事件為EPOLIN

}
注:對於write操作,由於sockfd是工作在阻塞模式下的,所以沒有必要進行特殊處理,和LT使用一樣。
分析:這種方法存在幾個問題:
(1) 對於read操作後的判斷——if(n==MAXLINE),不能説明這種情況buffer就一定還有沒有讀完的數據,試想萬一buffer中一共就有MAXLINE字節數據呢?這樣繼續 MOD IN就不再得到通知,而也就沒有機會對相應sockfd MOD OUT。
(2) 那麼如果服務端用其他方式能夠在適當時機對相應的sockfd MOD OUT,是否這種方法就可取呢?我們首先思考一下為什麼要用ET模式,因為ET模式能夠減少epoll_wait等系統調用,而我們在這裏每次read後都要MOD IN,之後又要epoll_wait,勢必造成效率降低,這不是適得其反嗎?
綜上,此方式不應該使用。
l 方法二
讀: 只要可讀, 就一直讀, 直到返回 0, 或者 errno = EAGAIN
寫: 只要可寫, 就一直寫, 直到數據發送完, 或者 errno = EAGAIN
if (events[i].events & EPOLLIN)

{

n = 0;

  while ((nread = read(fd, buf + n, BUFSIZ-1)) > 0) 

{

n += nread;

  }

if (nread == -1 && errno != EAGAIN)

{

perror("read error");

  }

  ev.data.fd = fd;

  ev.events = events[i].events | EPOLLOUT;

  epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);

}

if (events[i].events & EPOLLOUT)

{

int nwrite, data_size = strlen(buf);

  n = data_size;

  while (n > 0) 

{

nwrite = write(fd, buf + data_size - n, n);

      if (nwrite < n) 

{

         if (nwrite == -1 && errno != EAGAIN) 

{

perror("write error");

         }

         break;

       }

      n -= nwrite;

    }

ev.data.fd=fd; 

ev.events=EPOLLIN|EPOLLET; 

epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev);  //修改sockfd上要處理的事件為EPOLIN

}
注:使用這種方式一定要使每個連接的套接字工作於非阻塞模式,因為讀寫需要一直讀或寫直到出錯(對於讀,當讀到的實際字節數小於請求字節數時就可以停止),而如果你的文件描述符如果不是非阻塞的,那這個一直讀或一直寫勢必會在最後一次阻塞。這樣就不能在阻塞在epoll_wait上了,造成其他文件描述符的任務餓死。
綜上:方法一不適合使用,我們只能使用方法二,所以也就常説“ET需要工作在非阻塞模式”,當然這並不能説明ET不能工作在阻塞模式,而是工作在阻塞模式可能在運行中會出現一些問題。
l 方法三
仔細分析方法二的寫操作,我們發現這種方式並不很完美,因為寫操作返回EAGAIN就終止寫,但是返回EAGAIN只能説名當前buffer已滿不可寫,並不能保證用户(或服務端)要求寫的數據已經寫完。那麼如何保證對非阻塞的套接字寫夠請求的字節數才返回呢(阻塞的套接字直到將請求寫的字節數寫完才返回)?
我們需要封裝socket_write()的函數用來處理這種情況,該函數會盡量將數據寫完再返回,返回-1表示出錯。在socket_write()內部,當寫緩衝已滿(send()返回-1,且errno為EAGAIN),那麼會等待後再重試.
ssize_t socket_write(int sockfd, const char* buffer, size_t buflen)

{

ssize_t tmp;

size_t total = buflen;

const char* p = buffer;

while(1)

{

tmp = write(sockfd, p, total);

if(tmp < 0)

{

  // 當send收到信號時,可以繼續寫,但這裏返回-1.

  if(errno == EINTR)

    return -1;

  // 當socket是非阻塞時,如返回此錯誤,表示寫緩衝隊列已滿,

  // 在這裏做延時後再重試.

  if(errno == EAGAIN)

  {

    usleep(1000);

    continue;

  }

  return -1;

}

if((size_t)tmp == total)

    return buflen;

 total -= tmp;

 p += tmp;

}

return tmp;//返回已寫字節數

}
分析:這種方式也存在問題,因為在理論上可能會長時間的阻塞在socket_write()內部(buffer中的數據得不到發送,一直返回EAGAIN),但暫沒有更好的辦法。
不過看到這種方式時,我在想在socket_write中將sockfd改為阻塞模式應該一樣可行,等再次epoll_wait之前再將其改為非阻塞。

【文章福利】:小編整理了一些個人覺得比較好的學習書籍、視頻資料共享在羣文件裏面,有需要的可以自行添加哦!~點擊加入(需要自取)
image.png

5.2 ET模式下的accept

考慮這種情況:多個連接同時到達,服務器的 TCP 就緒隊列瞬間積累多個就緒

連接,由於是邊緣觸發模式,epoll 只會通知一次,accept 只處理一個連接,導致 TCP 就緒隊列中剩下的連接都得不到處理。
解決辦法是用 while 循環抱住 accept 調用,處理完 TCP 就緒隊列中的所有連接後再退出循環。如何知道是否處理完就緒隊列中的所有連接呢? accept 返回 -1 並且 errno 設置為 EAGAIN 就表示所有連接都處理完。
的正確使用方式為:
while ((conn_sock = accept(listenfd,(struct sockaddr ) &remote, (size_t )&addrlen)) > 0) {

handle_client(conn_sock);   

}

if (conn_sock == -1) {

 if (errno != EAGAIN && errno != ECONNABORTED    

        && errno != EPROTO && errno != EINTR)    

    perror("accept");   

}
原因:如果accept工作在阻塞模式,考慮這種情況: TCP 連接被客户端夭折,即在服務器調用 accept 之前(此時select等已經返回連接到達讀就緒),客户端主動發送 RST 終止連接,導致剛剛建立的連接從就緒隊列中移出,如果套接口被設置成阻塞模式,服務器就會一直阻塞在 accept 調用上,直到其他某個客户建立一個新的連接為止。但是在此期間,服務器單純地阻塞在accept 調用上(實際應該阻塞在select上),就緒隊列中的其他描述符都得不到處理。

解決辦法是把監聽套接口設置為非阻塞, 當客户在服務器調用 accept 之前中止

某個連接時,accept 調用可以立即返回 -1, 這時源自 Berkeley 的實現會在內核中處理該事件,並不會將該事件通知給 epoll,而其他實現把 errno 設置為 ECONNABORTED 或者 EPROTO 錯誤,我們應該忽略這兩個錯誤。(具體可參看UNP v1 p363)
6.1 ET模式為什麼要設置在非阻塞模式下工作
因為ET模式下的讀寫需要一直讀或寫直到出錯(對於讀,當讀到的實際字節數小於請求字節數時就可以停止),而如果你的文件描述符如果不是非阻塞的,那這個一直讀或一直寫勢必會在最後一次阻塞。這樣就不能在阻塞在epoll_wait上了,造成其他文件描述符的任務餓死。
6.2 使用ET和LT的區別
LT:水平觸發,效率會低於ET觸發,尤其在大併發,大流量的情況下。但是LT對代碼編寫要求比較低,不容易出現問題。LT模式服務編寫上的表現是:只要有數據沒有被獲取,內核就不斷通知你,因此不用擔心事件丟失的情況。
ET:邊緣觸發,效率非常高,在併發,大流量的情況下,會比LT少很多epoll的系統調用,因此效率高。但是對編程要求高,需要細緻的處理每個請求,否則容易發生丟失事件的情況。

下面舉一個列子來説明LT和ET的區別(都是非阻塞模式,阻塞就不説了,效率太低):
採用LT模式下,如果accept調用有返回就可以馬上建立當前這個連接了,再epoll_wait等待下次通知,和select一樣。
但是對於ET而言,如果accpet調用有返回,除了建立當前這個連接外,不能馬上就epoll_wait還需要繼續循環accpet,直到返回-1,且errno==EAGAIN,
從本質上講:與LT相比,ET模型是通過減少系統調用來達到提高並行效率的。
6.3 一道騰訊後台開發的面試題

使用Linux epoll模型,水平(LT)觸發模式,當socket可寫時,會不停的觸發socket可寫的事件,如何處理?
第一種最普遍的方式:
需要向socket寫數據的時候才把socket加入epoll,等待可寫事件。接受到可寫事件後,調用write或者send發送數據。當所有數據都寫完後,把socket移出epoll。
這種方式的缺點是,即使發送很少的數據,也要把socket加入epoll,寫完後在移出epoll,有一定操作代價。
一種改進的方式:
開始不把socket加入epoll,需要向socket寫數據的時候,直接調用write或者send發送數據。如果返回EAGAIN,把socket加入epoll,在epoll的驅動下寫數據,全部數據發送完畢後,再移出epoll。
這種方式的優點是:數據不多的時候可以避免epoll的事件處理,提高效率。
6.4什麼情況下用ET
很簡單,當你想提高程序效率的時候。

最後附一個epoll實例:


#include <sys/socket.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/epoll.h>
#include <sys/sendfile.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <fcntl.h>
#include <errno.h>

#define MAX_EVENTS 10
#define PORT 8080

//設置socket連接為非阻塞模式
void setnonblocking(int sockfd) {
    int opts;
    opts = fcntl(sockfd, F_GETFL);
    if(opts < 0) {
        perror("fcntl(F_GETFL)\n");
        exit(1);
    }
    opts = (opts | O_NONBLOCK);
    if(fcntl(sockfd, F_SETFL, opts) < 0) {
        perror("fcntl(F_SETFL)\n");
        exit(1);
    }
}

int main(){
    struct epoll_event ev, events[MAX_EVENTS]; //ev負責添加事件,events接收返回事件
    int addrlen, listenfd, conn_sock, nfds, epfd, fd, i, nread, n;
    struct sockaddr_in local, remote;
    char buf[BUFSIZ];

    //創建listen socket
    if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        perror("sockfd\n");
        exit(1);
    }
    setnonblocking(listenfd);//listenfd設置為非阻塞[1]
    bzero(&local, sizeof(local));
    local.sin_family = AF_INET;
    local.sin_addr.s_addr = htonl(INADDR_ANY);;
    local.sin_port = htons(PORT);
    if( bind(listenfd, (struct sockaddr *) &local, sizeof(local)) < 0) {
        perror("bind\n");
        exit(1);
    }
    listen(listenfd, 20);

    epfd = epoll_create(MAX_EVENTS);
    if (epfd == -1) {
        perror("epoll_create");
        exit(EXIT_FAILURE);
    }

    ev.events = EPOLLIN;
    ev.data.fd = listenfd;
    if (epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev) == -1) {//監聽listenfd
        perror("epoll_ctl: listen_sock");
        exit(EXIT_FAILURE);
    }

    for (;;) {
        nfds = epoll_wait(epfd, events, MAX_EVENTS, -1);
        if (nfds == -1) {
            perror("epoll_pwait");
            exit(EXIT_FAILURE);
        }

        for (i = 0; i < nfds; ++i) {
            fd = events[i].data.fd;
            if (fd == listenfd) {
                while ((conn_sock = accept(listenfd,(struct sockaddr *) &remote,
                                (size_t *)&addrlen)) > 0) {
                    setnonblocking(conn_sock);//下面設置ET模式,所以要設置非阻塞
                    ev.events = EPOLLIN | EPOLLET;
                    ev.data.fd = conn_sock;
                    if (epoll_ctl(epfd, EPOLL_CTL_ADD, conn_sock, &ev) == -1) {//讀監聽
                        perror("epoll_ctl: add"); //連接套接字
                        exit(EXIT_FAILURE);
                    }
                }
                if (conn_sock == -1) {
                    if (errno != EAGAIN && errno != ECONNABORTED
                            && errno != EPROTO && errno != EINTR)
                        perror("accept");
                }
                continue;
            }
            if (events[i].events & EPOLLIN) {
                n = 0;
                while ((nread = read(fd, buf + n, BUFSIZ-1)) > 0) {//ET下可以讀就一直讀
                    n += nread;
                }
                if (nread == -1 && errno != EAGAIN) {
                    perror("read error");
                }
                ev.data.fd = fd;
                ev.events = events[i].events | EPOLLOUT; //MOD OUT
                if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev) == -1) {
                    perror("epoll_ctl: mod");
                }
            }
            if (events[i].events & EPOLLOUT) {
              sprintf(buf, "HTTP/1.1 200 OK\r\nContent-Length: %d\r\n\r\nHello World", 11);
                int nwrite, data_size = strlen(buf);
                n = data_size;
                while (n > 0) {
                    nwrite = write(fd, buf + data_size - n, n);//ET下一直將要寫數據寫完
                    if (nwrite < n) {
                        if (nwrite == -1 && errno != EAGAIN) {
                            perror("write error");
                        }
                        break;
                    }
                    n -= nwrite;
                }
                close(fd);
            }
        }
    }
    return 0;
}
user avatar luguolangren 頭像 yefengweiliang 頭像
2 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.