使用IO多路复用器epoll实现TCP服务器
- 一、前言
- 二、新增使用API函数
- 2.1、epoll_create()函数
- 2.2、epoll_ctl()函数
- 2.3、struct epoll_event结构体
- 2.4、epoll_wait()函数
- 三、实现步骤
- 四、完整代码
- 五、TCP客户端
- 5.1、自己实现一个TCP客户端
- 5.2、Windows下可以使用NetAssist的网络助手工具
- 小结
一、前言
手把手教你从0开始编写TCP服务器程序,体验开局一块砖,大厦全靠垒。
为了避免篇幅过长使读者感到乏味,对【TCP服务器的开发】进行分阶段实现,一步步进行优化升级。
本节,在上一章节的基础上,将IO多路复用机制select改为更高效的IO多路复用机制epoll,使用epoll管理每个新接入的客户端连接,实现发送和接收。
epoll是Linux内核中一种可扩展的IO事件处理机制,可替代select和poll的系统调用。处理百万级并发访问性能更佳。
select的局限性:
(1) 文件描述符越多,性能越差。 单个进程中能够监视的文件描述符存在最大的数量,默认是1024(在linux内核头文件中定义有 #define _FD_SETSIZE 1024),当然也可以修改,但是文件描述符数量越多,性能越差。
(2)开销巨大 ,select需要复制大量的句柄数据结构,产生了巨大的开销(内核/用户空间内存拷贝问题)。
(3)select需要遍历整个句柄数组才能知道哪些句柄有事件。
(4)如果没有完成对一个已经就绪的文件描述符的IO操作,那么每次调用select还是会将这些文件描述符通知进程,即水平触发。
(5)poll使用链表保存监视的文件描述符,虽然没有了监视文件数量的限制,但是其他缺点依旧存在。
由于以上缺点,基于select模型的服务器程序,要达到十万以上的并发访问,是很难完成的。因此,epoll出场了。
二、新增使用API函数
2.1、epoll_create()函数
函数原型:
#include <sys/epoll.h>int epoll_create(int size);
功能:创建epoll的文件描述符。
参数说明:size表示内核需要监控的最大数量,但是这个参数内核已经不会用到,只要传入一个大于0的值即可。 当size<=0时,会直接返回不可用,这是历史原因保留下来的,最早的epoll_create是需要定义一次性就绪的最大数量;后来使用了链表以便便维护和扩展,就不再需要使用传入的参数。
返回:返回该对象的描述符,注意要使用 close 关闭该描述符。
2.2、epoll_ctl()函数
函数原型:
#include <sys/epoll.h>int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);// epoll_ctl对应系统调用sys_epoll_ctl
功能:操作epoll的文件描述符,主要是对epoll的红黑树节点进行操作,比如节点的增删改查。
参数说明:
参数 | 含义 |
---|---|
epfd | 通过 epoll_create 创建的文件描述符 |
op | 对红黑树的操作,比如节点的增加、修改、删除,分别对应EPOLL_CTL_ADD、EPOLL_CTL_MOD、EPOLL_CTL_DEL |
fd | 需要添加监听的文件描述符 |
event | 事件信息 |
注意:epoll_ctl是非阻塞的,不会被挂起。
2.3、struct epoll_event结构体
struct epoll_event结构体原型:
typedef union epoll_data{ void* ptr; int fd; uint32_t u32; uint64_t u64};struct epoll_event{ uint32_t events; epoll_data_t data;}
events成员代表要监听的epoll事件类型
events成员:
成员变量 | 含义 |
---|---|
EPOLLIN | 监听fd的读事件 |
EPOLLOUT | 监听fd的写事件 |
EPOLLRI | 监听紧急数据可读事件(带外数据到来) |
EPOLLRDHUP | 监听套接字关闭或半关闭事件 |
EPOLLET | 将EPOLL设为边缘触发(Edge Triggered)模式 |
data成员:
data 成员时一个联合体类型,可以在调用 epoll_ctl 给 fd 添加/修改描述符监听的事件时携带一些数据,方便后面的epoll_wait可以取出信息使用。
2.4、epoll_wait()函数
函数原型:
#include <sys/epoll.h>int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
功能:阻塞一段时间,等待事件发生
返回:返回事件数量,事件集添加到events数组中。也就是遍历红黑树中的双向链表,把双向链表中的节点数据拷贝出来,拷贝完毕后把节点从双向链表中移除。
返回值 | 含义 |
---|---|
大于0 | 事件个数 |
等于0 | 超时时间timeout到了 |
小于0 | 出错,可通过errno查看出错原因 |
参数说明:
参数 | 含义 |
---|---|
epfd | 通过 epoll_create 创建的文件描述符 |
events | 存放就绪的事件集合,是输出参数 |
maxevents | 最大可存放事件数量,events数组大小 |
timeout | 阻塞等待的时间长短,单位是毫秒,-1表示一直阻塞等待 |
三、实现步骤
epoll的优点:
- 不需要轮询所有的文件描述符。
- 每次取就绪集合,都在固定位置。
- 事件的就绪和IO触发可以异步解耦。
(1)创建socket。
int listenfd=socket(AF_INET,SOCK_STREAM,0);if(listenfd==-1){ printf("errno = %d, %s/n",errno,strerror(errno)); return SOCKET_CREATE_FAILED;}
(2)绑定地址。
struct sockaddr_in server;memset(&server,0,sizeof(server));server.sin_family=AF_INET;server.sin_addr.s_addr=htonl(INADDR_ANY);server.sin_port=htons(LISTEN_PORT);if(-1==bind(listenfd,(struct sockaddr*)&server,sizeof(server))){ printf("errno = %d, %s/n",errno,strerror(errno)); close(listenfd); return SOCKET_BIND_FAILED;}
(3)设置监听。
if(-1==listen(listenfd,BLOCK_SIZE)){ printf("errno = %d, %s/n",errno,strerror(errno)); close(listenfd); return SOCKET_LISTEN_FAILED;}
(4)创建epoll。
int epfd=epoll_create(1);if(epfd==-1){ perror("epoll_create error"); return SOCKET_EPOLL_CREATE_FAILED;}
(5)添加listen fd 到epoll。
struct epoll_event ev;ev.events=EPOLLIN;ev.data.fd=listenfd;if(epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev)==-1){ perror("epoll_ctl error"); return SOCKET_EPOLL_CTL_FAILED;}
(6)监听事件。
int nready=epoll_wait(epfd,evs,EVENTS_LENGTH,-1);
(7)如果监听套接字有新连接请求,处理新连接。
int curfd=evs[i].data.fd;if(curfd==listenfd){ // accept struct sockaddr_in client; socklen_t clientlen=sizeof(client); int clientfd=accept(listenfd,(struct sockaddr*)&client,&clientlen); if(clientfd==-1) { perror("accept error"); continue; } //printf("client %s:%d connected/n", inet_ntoa(cli_addr.sin_addr), ntohs(cli_addr.sin_port)); printf("client %s:%d connected/n", inet_ntoa(client.sin_addr),ntohs(client.sin_port)); ev.events=EPOLLIN; ev.data.fd=clientfd; if(epoll_ctl(epfd,EPOLL_CTL_ADD,clientfd,&ev)==-1) { perror("epoll_ctl error"); exit(SOCKET_EPOLL_CTL_FAILED); }}
(8)处理客户端发来的数据和发送数据到客户端。
if(evs[i].events&EPOLLIN){ //read int ret=recv(curfd,rbuff,BUFFER_LENGTH,0); if(ret>0) { printf("recv from %d: %s/n",curfd,rbuff); rbuff[ret]='/0'; memcpy(wbuff,rbuff,BUFFER_LENGTH); ev.events=EPOLLOUT; ev.data.fd=curfd; if(epoll_ctl(epfd,EPOLL_CTL_MOD,curfd,&ev)==-1) { perror("epoll_ctl error"); exit(SOCKET_EPOLL_CTL_FAILED); } } else if(ret==0)// 连接关闭 { printf("client %d disconnected/n", evs[i].data.fd); // 将连接从epoll实例中删除 if(epoll_ctl(epfd, EPOLL_CTL_DEL, evs[i].data.fd, NULL)==-1) { perror("epoll_ctl error"); exit(SOCKET_EPOLL_CTL_FAILED); } close(evs[i].data.fd); } else if (ret == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { continue; // 数据已读完 } perror("read error"); break; } else{ printf("read error,unknow type %d/n",ret); }}else if(evs[i].events&EPOLLOUT){ //write send(curfd,wbuff,BUFFER_LENGTH,0); ev.events=EPOLLIN; ev.data.fd=curfd; if(epoll_ctl(epfd,EPOLL_CTL_MOD,curfd,&ev)==-1) { perror("epoll_ctl error"); exit(SOCKET_EPOLL_CTL_FAILED); }}
四、完整代码
#include <stdio.h>#include <sys/socket.h>#include <sys/types.h>#include <netinet/in.h>#include <arpa/inet.h>#include <errno.h>#include <string.h>#include <unistd.h>#include <stdlib.h>#include <sys/epoll.h>#define LISTEN_PORT 9999#define BLOCK_SIZE 10#define BUFFER_LENGTH 1024#define EVENTS_LENGTH 128enum ERROR_CODE{ SOCKET_CREATE_FAILED=-1, SOCKET_BIND_FAILED=-2, SOCKET_LISTEN_FAILED=-3, SOCKET_ACCEPT_FAILED=-4, SOCKET_SELECT_FAILED=-5, SOCKET_EPOLL_CREATE_FAILED=-6, SOCKET_EPOLL_CTL_FAILED=-7, SOCKET_EPOLL_WAIT_FAILED=-8};char rbuff[BUFFER_LENGTH] = { 0 };char wbuff[BUFFER_LENGTH] = { 0 };int main(int argc,char **argv){ // 1. int listenfd=socket(AF_INET,SOCK_STREAM,0); if(listenfd==-1){ printf("errno = %d, %s/n",errno,strerror(errno)); return SOCKET_CREATE_FAILED; } // 2. struct sockaddr_in server; memset(&server,0,sizeof(server)); server.sin_family=AF_INET; server.sin_addr.s_addr=htonl(INADDR_ANY); server.sin_port=htons(LISTEN_PORT); if(-1==bind(listenfd,(struct sockaddr*)&server,sizeof(server))){ printf("errno = %d, %s/n",errno,strerror(errno)); close(listenfd); return SOCKET_BIND_FAILED; } // 3. if(-1==listen(listenfd,BLOCK_SIZE)){ printf("errno = %d, %s/n",errno,strerror(errno)); close(listenfd); return SOCKET_LISTEN_FAILED; } printf("listen port: %d/n",LISTEN_PORT); // 4. int epfd=epoll_create(1); if(epfd==-1) { perror("epoll_create error"); return SOCKET_EPOLL_CREATE_FAILED; } struct epoll_event ev,evs[EVENTS_LENGTH]; ev.events=EPOLLIN; ev.data.fd=listenfd; if(epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev)==-1) { perror("epoll_ctl error"); return SOCKET_EPOLL_CTL_FAILED; } printf("start epoll_wait. epoll fd = %d/n",epfd); while(1) { int nready=epoll_wait(epfd,evs,EVENTS_LENGTH,-1); if (nready == -1) { perror("epoll_wait error"); exit(SOCKET_EPOLL_WAIT_FAILED); } for(int i=0;i<nready;i++) { int curfd=evs[i].data.fd; if(curfd==listenfd) { // accept struct sockaddr_in client; socklen_t clientlen=sizeof(client); int clientfd=accept(listenfd,(struct sockaddr*)&client,&clientlen); if(clientfd==-1) { perror("accept error"); continue; } //printf("client %s:%d connected/n", inet_ntoa(cli_addr.sin_addr), ntohs(cli_addr.sin_port)); printf("client %s:%d connected/n", inet_ntoa(client.sin_addr),ntohs(client.sin_port)); ev.events=EPOLLIN; ev.data.fd=clientfd; if(epoll_ctl(epfd,EPOLL_CTL_ADD,clientfd,&ev)==-1) { perror("epoll_ctl error"); exit(SOCKET_EPOLL_CTL_FAILED); } } else if(evs[i].events&EPOLLIN) { //read int ret=recv(curfd,rbuff,BUFFER_LENGTH,0); if(ret>0) { printf("recv from %d: %s/n",curfd,rbuff); rbuff[ret]='/0'; memcpy(wbuff,rbuff,BUFFER_LENGTH); ev.events=EPOLLOUT; ev.data.fd=curfd; if(epoll_ctl(epfd,EPOLL_CTL_MOD,curfd,&ev)==-1) { perror("epoll_ctl error"); exit(SOCKET_EPOLL_CTL_FAILED); } } else if(ret==0)// 连接关闭 { printf("client %d disconnected/n", evs[i].data.fd); // 将连接从epoll实例中删除 if(epoll_ctl(epfd, EPOLL_CTL_DEL, evs[i].data.fd, NULL)==-1) { perror("epoll_ctl error"); exit(SOCKET_EPOLL_CTL_FAILED); } close(evs[i].data.fd); } else if (ret == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { continue; // 数据已读完 } perror("read error"); break; } else{ printf("read error,unknow type %d/n",ret); } } else if(evs[i].events&EPOLLOUT) { //write send(curfd,wbuff,BUFFER_LENGTH,0); ev.events=EPOLLIN; ev.data.fd=curfd; if(epoll_ctl(epfd,EPOLL_CTL_MOD,curfd,&ev)==-1) { perror("epoll_ctl error"); exit(SOCKET_EPOLL_CTL_FAILED); } } } } close(listenfd); return 0;}
编译命令:
gcc -o server server.c
五、TCP客户端
5.1、自己实现一个TCP客户端
自己实现一个TCP客户端连接TCP服务器的代码:
#include <stdio.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <errno.h>#include <string.h>#include <unistd.h>#include <stdlib.h>#define BUFFER_LENGTH 1024enum ERROR_CODE{ SOCKET_CREATE_FAILED=-1, SOCKET_CONN_FAILED=-2, SOCKET_LISTEN_FAILED=-3, SOCKET_ACCEPT_FAILED=-4};int main(int argc,char** argv){ if(argc<3) { printf("Please enter the server IP and port."); return 0; } printf("connect to %s, port=%s/n",argv[1],argv[2]); int connfd=socket(AF_INET,SOCK_STREAM,0); if(connfd==-1) { printf("errno = %d, %s/n",errno,strerror(errno)); return SOCKET_CREATE_FAILED; } struct sockaddr_in serv; serv.sin_family=AF_INET; serv.sin_addr.s_addr=inet_addr(argv[1]); serv.sin_port=htons(atoi(argv[2])); socklen_t len=sizeof(serv); int rwfd=connect(connfd,(struct sockaddr*)&serv,len); if(rwfd==-1) { printf("errno = %d, %s/n",errno,strerror(errno)); close(rwfd); return SOCKET_CONN_FAILED; } int ret=1; while(ret>0) { char buf[BUFFER_LENGTH]={0}; printf("Please enter the string to send:/n"); scanf("%s",buf); send(connfd,buf,strlen(buf),0); memset(buf,0,BUFFER_LENGTH); printf("recv:/n"); ret=recv(connfd,buf,BUFFER_LENGTH,0); printf("%s/n",buf); } close(rwfd); return 0;}
编译:
gcc -o client client.c
5.2、Windows下可以使用NetAssist的网络助手工具
下载地址:http://old.tpyboard.com/downloads/NetAssist.exe
小结
至此,我们最终确定使用IO多路复用器epoll处理高并发。但是,上面的epoll实现的TCP服务器存在一些问题:
- 所有的连接都是使用相同的读写缓存(rbuff和wbuff),这会导致数据覆盖。
- 没有分包能力。
下一章节会解决这些问题,构建一个reactor网络模型。