相關視頻推薦
面試中正經“八股文”網絡原理tcp/udp,網絡編程epoll/reactor
epoll 原理剖析 以及 reactor 模型應用
epoll原理剖析以及三握四揮的處理
LinuxC++後台服務器開發架構師免費學習地址
徹底學會使用epoll(一)——ET模式實現分析
接上一篇
首先,看程序四的例子。
l 程序四
#include <unistd.h>
#include <iostream>
#include <sys/epoll.h>
using namespace std;
int main(void)
{
int epfd,nfds;
struct epoll_event ev,events[5];//ev用於註冊事件,數組用於返回要處理的事件
epfd=epoll_create(1);//只需要監聽一個描述符——標準輸出
ev.data.fd=STDOUT_FILENO;
ev.events=EPOLLOUT|EPOLLET;//監聽讀狀態同時設置ET模式
epoll_ctl(epfd,EPOLL_CTL_ADD,STDOUT_FILENO,&ev);//註冊epoll事件
for(;;)
{
nfds=epoll_wait(epfd,events,5,-1);
for(int i=0;i<nfds;i++)
{
if(events[i].data.fd==STDOUT_FILENO)
cout<<"hello world!"<<endl;
}
}
};
這個程序的功能是隻要標準輸出寫就緒,就輸出“hello world!”。
運行結果:
我們發現這將是一個死循環。下面具體分析一下這個程序的執行過程:
(1) 首先初始buffer為空,buffer中有空間可寫,這時無論是ET還是LT都會將對應的epitem加入rdlist(對應第一節圖中的紅線),導致epoll_wait就返回寫就緒。
(2) 程序想標準輸出輸出”hello world!”和換行符,因為標準輸出為控制枱的時候緩衝是“行緩衝”,所以換行符導致buffer中的內容清空,這就對應第二節中ET模式下寫就緒的第二種情況——當有舊數據被髮送走時,即buffer中待寫的內容變少得時候會觸發fd狀態的改變。所以下次epoll_wait會返回寫就緒。之後重複這個過程一直循環下去。
我們再看程序五。
程序五
相對程序四這裏僅僅去掉了輸出的換行操作。即:
cout<<"hello world!";
運行結果如下:
我們看到程序成掛起狀態。因為第一次epoll_wait返回寫就緒後,程序向標準輸出的buffer中寫入“hello world!”,但是因為沒有輸出換行,所以buffer中的內容一直存在,下次epoll_wait的時候,雖然有寫空間但是ET模式下不再返回寫就緒。回憶第一節關於ET的實現,這種情況原因就是第一次buffer為空,導致epitem加入rdlist,返回一次就緒後移除此epitem,之後雖然buffer仍然可寫,但是由於對應epitem已經不再rdlist中,就不會對其就緒fd的events的在檢測了。
程序六
int main(void)
{
int epfd,nfds;
struct epoll_event ev,events[5];//ev用於註冊事件,數組用於返回要處理的事件
epfd=epoll_create(1);//只需要監聽一個描述符——標準輸出
ev.data.fd=STDOUT_FILENO;
ev.events=EPOLLOUT;//使用默認LT模式
epoll_ctl(epfd,EPOLL_CTL_ADD,STDOUT_FILENO,&ev);//註冊epoll事件
for(;;)
{
nfds=epoll_wait(epfd,events,5,-1);
for(int i=0;i<nfds;i++)
{
if(events[i].data.fd==STDOUT_FILENO)
cout<<"hello world!";
}
}
};
程序六相對程序五僅僅是修改ET模式為默認的LT模式,我們發現程序再次死循環。這時候原因已經很清楚了,因為當向buffer寫入”hello world!”後,雖然buffer沒有輸出清空,但是LT模式下只有buffer有寫空間就返回寫就緒,所以會一直輸出”hello world!”,當buffer滿的時候,buffer會自動刷清輸出,同樣會造成epoll_wait返回寫就緒。
程序七
int main(void)
{
int epfd,nfds;
struct epoll_event ev,events[5];//ev用於註冊事件,數組用於返回要處理的事件
epfd=epoll_create(1);//只需要監聽一個描述符——標準輸出
ev.data.fd=STDOUT_FILENO;
ev.events=EPOLLOUT|EPOLLET;//監聽讀狀態同時設置ET模式
epoll_ctl(epfd,EPOLL_CTL_ADD,STDOUT_FILENO,&ev);//註冊epoll事件
for(;;)
{
nfds=epoll_wait(epfd,events,5,-1);
for(int i=0;i<nfds;i++)
{
if(events[i].data.fd==STDOUT_FILENO)
cout<<"hello world!";
ev.data.fd=STDOUT_FILENO;
ev.events=EPOLLOUT|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,STDOUT_FILENO,&ev); //重新MOD事件(ADD無效)
}
}
};
程序七相對於程序五在每次向標準輸出的buffer輸出”hello world!”後,重新MOD OUT事件。所以相當於每次重新進行第一節中紅線描述的途徑返回就緒,導致程序循環輸出。
ET模式下的讀寫
經過前面幾節分析,我們可以知道,當epoll工作在ET模式下時,對於讀操作,如果read一次沒有讀盡buffer中的數據,那麼下次將得不到讀就緒的通知,造成buffer中已有的數據無機會讀出,除非有新的數據再次到達。對於寫操作,主要是因為ET模式下fd通常為非阻塞造成的一個問題——如何保證將用户要求寫的數據寫完。
要解決上述兩個ET模式下的讀寫問題,我們必須實現:
a. 對於讀,只要buffer中還有數據就一直讀;
b. 對於寫,只要buffer還有空間且用户請求寫的數據還未寫完,就一直寫。
要實現上述a、b兩個效果,我們有兩種方法解決。
方法一
(1) 每次讀入操作後(read,recv),用户主動epoll_mod IN事件,此時只要該fd的緩衝還有數據可以讀,則epoll_wait會返回讀就緒。
(2) 每次輸出操作後(write,send),用户主動epoll_mod OUT事件,此時只要該該fd的緩衝可以發送數據(發送buffer不滿),則epoll_wait就會返回寫就緒(有時候採用該機制通知epoll_wai醒過來)。
這個方法的原理我們在之前討論過:當buffer中有數據可讀(即buffer不空)且用户對相應fd進行epoll_mod IN事件時ET模式返回讀就緒,當buffer中有可寫空間(即buffer不滿)且用户對相應fd進行epoll_mod OUT事件時返回寫就緒。
所以得到如下解決方式:
if(events[i].events&EPOLLIN)//如果收到數據,那麼進行讀入
{
cout << "EPOLLIN" << endl;
sockfd = events[i].data.fd;
if ( (n = read(sockfd, line, MAXLINE))>0)
{
line[n] = '/0';
cout << "read " << line << endl;
if(n==MAXLINE)
{
ev.data.fd=sockfd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //數據還沒讀完,重新MOD IN事件
}
else
{
ev.data.fd=sockfd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //buffer中的數據已經讀取完畢MOD OUT事件
}
}
else if (n == 0)
{
close(sockfd);
}
}
else if(events[i].events&EPOLLOUT) // 如果有數據發送
{
sockfd = events[i].data.fd;
write(sockfd, line, n);
ev.data.fd=sockfd; //設置用於讀操作的文件描述符
ev.events=EPOLLIN|EPOLLET; //設置用於注測的讀操作事件
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //修改sockfd上要處理的事件為EPOLIN
}
注:對於write操作,由於sockfd是工作在阻塞模式下的,所以沒有必要進行特殊處理,和LT使用一樣。
分析:這種方法存在幾個問題:
(1) 對於read操作後的判斷——if(n==MAXLINE),不能説明這種情況buffer就一定還有沒有讀完的數據,試想萬一buffer中一共就有MAXLINE字節數據呢?這樣繼續 MOD IN就不再得到通知,而也就沒有機會對相應sockfd MOD OUT。
(2) 那麼如果服務端用其他方式能夠在適當時機對相應的sockfd MOD OUT,是否這種方法就可取呢?我們首先思考一下為什麼要用ET模式,因為ET模式能夠減少epoll_wait等系統調用,而我們在這裏每次read後都要MOD IN,之後又要epoll_wait,勢必造成效率降低,這不是適得其反嗎?
綜上,此方式不應該使用。
l 方法二
讀: 只要可讀, 就一直讀, 直到返回 0, 或者 errno = EAGAIN
寫: 只要可寫, 就一直寫, 直到數據發送完, 或者 errno = EAGAIN
if (events[i].events & EPOLLIN)
{
n = 0;
while ((nread = read(fd, buf + n, BUFSIZ-1)) > 0)
{
n += nread;
}
if (nread == -1 && errno != EAGAIN)
{
perror("read error");
}
ev.data.fd = fd;
ev.events = events[i].events | EPOLLOUT;
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
}
if (events[i].events & EPOLLOUT)
{
int nwrite, data_size = strlen(buf);
n = data_size;
while (n > 0)
{
nwrite = write(fd, buf + data_size - n, n);
if (nwrite < n)
{
if (nwrite == -1 && errno != EAGAIN)
{
perror("write error");
}
break;
}
n -= nwrite;
}
ev.data.fd=fd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev); //修改sockfd上要處理的事件為EPOLIN
}
注:使用這種方式一定要使每個連接的套接字工作於非阻塞模式,因為讀寫需要一直讀或寫直到出錯(對於讀,當讀到的實際字節數小於請求字節數時就可以停止),而如果你的文件描述符如果不是非阻塞的,那這個一直讀或一直寫勢必會在最後一次阻塞。這樣就不能在阻塞在epoll_wait上了,造成其他文件描述符的任務餓死。
綜上:方法一不適合使用,我們只能使用方法二,所以也就常説“ET需要工作在非阻塞模式”,當然這並不能説明ET不能工作在阻塞模式,而是工作在阻塞模式可能在運行中會出現一些問題。
l 方法三
仔細分析方法二的寫操作,我們發現這種方式並不很完美,因為寫操作返回EAGAIN就終止寫,但是返回EAGAIN只能説名當前buffer已滿不可寫,並不能保證用户(或服務端)要求寫的數據已經寫完。那麼如何保證對非阻塞的套接字寫夠請求的字節數才返回呢(阻塞的套接字直到將請求寫的字節數寫完才返回)?
我們需要封裝socket_write()的函數用來處理這種情況,該函數會盡量將數據寫完再返回,返回-1表示出錯。在socket_write()內部,當寫緩衝已滿(send()返回-1,且errno為EAGAIN),那麼會等待後再重試.
ssize_t socket_write(int sockfd, const char* buffer, size_t buflen)
{
ssize_t tmp;
size_t total = buflen;
const char* p = buffer;
while(1)
{
tmp = write(sockfd, p, total);
if(tmp < 0)
{
// 當send收到信號時,可以繼續寫,但這裏返回-1.
if(errno == EINTR)
return -1;
// 當socket是非阻塞時,如返回此錯誤,表示寫緩衝隊列已滿,
// 在這裏做延時後再重試.
if(errno == EAGAIN)
{
usleep(1000);
continue;
}
return -1;
}
if((size_t)tmp == total)
return buflen;
total -= tmp;
p += tmp;
}
return tmp;//返回已寫字節數
}
分析:這種方式也存在問題,因為在理論上可能會長時間的阻塞在socket_write()內部(buffer中的數據得不到發送,一直返回EAGAIN),但暫沒有更好的辦法。
不過看到這種方式時,我在想在socket_write中將sockfd改為阻塞模式應該一樣可行,等再次epoll_wait之前再將其改為非阻塞。
【文章福利】:小編整理了一些個人覺得比較好的學習書籍、視頻資料共享在羣文件裏面,有需要的可以自行添加哦!~點擊加入(需要自取)
5.2 ET模式下的accept
考慮這種情況:多個連接同時到達,服務器的 TCP 就緒隊列瞬間積累多個就緒
連接,由於是邊緣觸發模式,epoll 只會通知一次,accept 只處理一個連接,導致 TCP 就緒隊列中剩下的連接都得不到處理。
解決辦法是用 while 循環抱住 accept 調用,處理完 TCP 就緒隊列中的所有連接後再退出循環。如何知道是否處理完就緒隊列中的所有連接呢? accept 返回 -1 並且 errno 設置為 EAGAIN 就表示所有連接都處理完。
的正確使用方式為:
while ((conn_sock = accept(listenfd,(struct sockaddr ) &remote, (size_t )&addrlen)) > 0) {
handle_client(conn_sock);
}
if (conn_sock == -1) {
if (errno != EAGAIN && errno != ECONNABORTED
&& errno != EPROTO && errno != EINTR)
perror("accept");
}
原因:如果accept工作在阻塞模式,考慮這種情況: TCP 連接被客户端夭折,即在服務器調用 accept 之前(此時select等已經返回連接到達讀就緒),客户端主動發送 RST 終止連接,導致剛剛建立的連接從就緒隊列中移出,如果套接口被設置成阻塞模式,服務器就會一直阻塞在 accept 調用上,直到其他某個客户建立一個新的連接為止。但是在此期間,服務器單純地阻塞在accept 調用上(實際應該阻塞在select上),就緒隊列中的其他描述符都得不到處理。
解決辦法是把監聽套接口設置為非阻塞, 當客户在服務器調用 accept 之前中止
某個連接時,accept 調用可以立即返回 -1, 這時源自 Berkeley 的實現會在內核中處理該事件,並不會將該事件通知給 epoll,而其他實現把 errno 設置為 ECONNABORTED 或者 EPROTO 錯誤,我們應該忽略這兩個錯誤。(具體可參看UNP v1 p363)
6.1 ET模式為什麼要設置在非阻塞模式下工作
因為ET模式下的讀寫需要一直讀或寫直到出錯(對於讀,當讀到的實際字節數小於請求字節數時就可以停止),而如果你的文件描述符如果不是非阻塞的,那這個一直讀或一直寫勢必會在最後一次阻塞。這樣就不能在阻塞在epoll_wait上了,造成其他文件描述符的任務餓死。
6.2 使用ET和LT的區別
LT:水平觸發,效率會低於ET觸發,尤其在大併發,大流量的情況下。但是LT對代碼編寫要求比較低,不容易出現問題。LT模式服務編寫上的表現是:只要有數據沒有被獲取,內核就不斷通知你,因此不用擔心事件丟失的情況。
ET:邊緣觸發,效率非常高,在併發,大流量的情況下,會比LT少很多epoll的系統調用,因此效率高。但是對編程要求高,需要細緻的處理每個請求,否則容易發生丟失事件的情況。
下面舉一個列子來説明LT和ET的區別(都是非阻塞模式,阻塞就不説了,效率太低):
採用LT模式下,如果accept調用有返回就可以馬上建立當前這個連接了,再epoll_wait等待下次通知,和select一樣。
但是對於ET而言,如果accpet調用有返回,除了建立當前這個連接外,不能馬上就epoll_wait還需要繼續循環accpet,直到返回-1,且errno==EAGAIN,
從本質上講:與LT相比,ET模型是通過減少系統調用來達到提高並行效率的。
6.3 一道騰訊後台開發的面試題
使用Linux epoll模型,水平(LT)觸發模式,當socket可寫時,會不停的觸發socket可寫的事件,如何處理?
第一種最普遍的方式:
需要向socket寫數據的時候才把socket加入epoll,等待可寫事件。接受到可寫事件後,調用write或者send發送數據。當所有數據都寫完後,把socket移出epoll。
這種方式的缺點是,即使發送很少的數據,也要把socket加入epoll,寫完後在移出epoll,有一定操作代價。
一種改進的方式:
開始不把socket加入epoll,需要向socket寫數據的時候,直接調用write或者send發送數據。如果返回EAGAIN,把socket加入epoll,在epoll的驅動下寫數據,全部數據發送完畢後,再移出epoll。
這種方式的優點是:數據不多的時候可以避免epoll的事件處理,提高效率。
6.4什麼情況下用ET
很簡單,當你想提高程序效率的時候。
最後附一個epoll實例:
#include <sys/socket.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/epoll.h>
#include <sys/sendfile.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <fcntl.h>
#include <errno.h>
#define MAX_EVENTS 10
#define PORT 8080
//設置socket連接為非阻塞模式
void setnonblocking(int sockfd) {
int opts;
opts = fcntl(sockfd, F_GETFL);
if(opts < 0) {
perror("fcntl(F_GETFL)\n");
exit(1);
}
opts = (opts | O_NONBLOCK);
if(fcntl(sockfd, F_SETFL, opts) < 0) {
perror("fcntl(F_SETFL)\n");
exit(1);
}
}
int main(){
struct epoll_event ev, events[MAX_EVENTS]; //ev負責添加事件,events接收返回事件
int addrlen, listenfd, conn_sock, nfds, epfd, fd, i, nread, n;
struct sockaddr_in local, remote;
char buf[BUFSIZ];
//創建listen socket
if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("sockfd\n");
exit(1);
}
setnonblocking(listenfd);//listenfd設置為非阻塞[1]
bzero(&local, sizeof(local));
local.sin_family = AF_INET;
local.sin_addr.s_addr = htonl(INADDR_ANY);;
local.sin_port = htons(PORT);
if( bind(listenfd, (struct sockaddr *) &local, sizeof(local)) < 0) {
perror("bind\n");
exit(1);
}
listen(listenfd, 20);
epfd = epoll_create(MAX_EVENTS);
if (epfd == -1) {
perror("epoll_create");
exit(EXIT_FAILURE);
}
ev.events = EPOLLIN;
ev.data.fd = listenfd;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev) == -1) {//監聽listenfd
perror("epoll_ctl: listen_sock");
exit(EXIT_FAILURE);
}
for (;;) {
nfds = epoll_wait(epfd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_pwait");
exit(EXIT_FAILURE);
}
for (i = 0; i < nfds; ++i) {
fd = events[i].data.fd;
if (fd == listenfd) {
while ((conn_sock = accept(listenfd,(struct sockaddr *) &remote,
(size_t *)&addrlen)) > 0) {
setnonblocking(conn_sock);//下面設置ET模式,所以要設置非阻塞
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = conn_sock;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, conn_sock, &ev) == -1) {//讀監聽
perror("epoll_ctl: add"); //連接套接字
exit(EXIT_FAILURE);
}
}
if (conn_sock == -1) {
if (errno != EAGAIN && errno != ECONNABORTED
&& errno != EPROTO && errno != EINTR)
perror("accept");
}
continue;
}
if (events[i].events & EPOLLIN) {
n = 0;
while ((nread = read(fd, buf + n, BUFSIZ-1)) > 0) {//ET下可以讀就一直讀
n += nread;
}
if (nread == -1 && errno != EAGAIN) {
perror("read error");
}
ev.data.fd = fd;
ev.events = events[i].events | EPOLLOUT; //MOD OUT
if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev) == -1) {
perror("epoll_ctl: mod");
}
}
if (events[i].events & EPOLLOUT) {
sprintf(buf, "HTTP/1.1 200 OK\r\nContent-Length: %d\r\n\r\nHello World", 11);
int nwrite, data_size = strlen(buf);
n = data_size;
while (n > 0) {
nwrite = write(fd, buf + data_size - n, n);//ET下一直將要寫數據寫完
if (nwrite < n) {
if (nwrite == -1 && errno != EAGAIN) {
perror("write error");
}
break;
}
n -= nwrite;
}
close(fd);
}
}
}
return 0;
}