本文共 6109 字,大约阅读时间需要 20 分钟。
在经典的Unix网络编程中,总结出了5种不同的网络IO模型,分别是阻塞式IO,非阻塞IO, IO多路复用,信号驱动IO,以及异步IO模型。
1. 阻塞式IO
2. 非阻塞式IO
3. IO多路复用
4. 信号驱动式IO
5. 异步IO模型
所以最终一比较,至少在linux平台下,目前主流的方案大多基于IO多路复用技术。
linux平台提供的主要的IO多路复用技术有select, poll, epoll,主要目的是为了能让一个或少量的select线程(或reactor线程),来管理多个连接,本质上是基于一个真实生产环境中的特性,比如虽然存在几万个连接,但是在某一时间范围内,有数据可读或者可写的socket并不会很多,既active的连接不会很多。当然如果在一个极端环境下面,比如是一个高速的局域网,并且每个client连接都会一直不断的发送数据,既每个连接都可以看成active的连接,那么基于IO多路复用技术未必是最佳方案。
linux系统提供select函数来实现多路复用输入/输出模型,select系统调用是用来让我们的应用程序监视多个文件句柄(包括socket文件句柄)的状态变化,程序会在select这里等待,直到被监视的文件句柄有一个或多个发生了状态改变如可读或可写
API原型为:
int select(int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
参数说明:
n 一般为最大文件描述符 + 1, 既STDIN_FILENO + 1 该值会有limit限制,一般为1024,所以生产环境基本用epoll,kqueue等代替 readfds 监视是否有可读的文件描述符集合 writefds 监视是否有可写的文件描述符集合 exceptfds 监视是否有异常情况发生的文件描述符集合 timeout 超时时间,如果在某一段时间内依然没有相应事件触发,则会阻塞直到timeout时间过期 timeout.tv_sec单位为秒, timeout.tv_usec单位为微秒poll算select的加强版,但基本原理跟select类似,暂不赘述,后续在补充
由于select和poll系统调用存在以下几个问题,Linux内核2.6环境新增Event Poll的方式。
select/poll每次检查的时候是通过遍历所有的文件描述符(fd), 尤其是对于网络scoket而言,大部分存在这么一个特性,既某一个时间点里,只有很少一部分的socket是“活跃”状态,如果每次都是遍历所有的网络文件描述符的话,当文件描述符变大之后,性能就会随着连接数变大之后线型下降
select存在最大文件描述符的限制,具体取决于常量FD_SETSIZE,默认大小为1024, 不能满足大量的客户端连接.
反观epoll,则改进了以上不足的地方
在内核实现中epoll是根据每个fd上面的callback函数实现的。可以做到只有“活跃”的socket才会主动的去调用 callback函数,其他idle状态socket则不会
epoll没有对fd描述符有限制,理论上取决于系统内存大小, 可以通过命令 cat /proc/sys/fs/file-max查看,大概1G内存可以创建10w个连接
epoll的具体实现使用mmap加速内核与用户空间的消息传递,进一步提高性能
epoll实际包含3个系统调用组成,分别为epoll_create(), epoll_ctl(), epoll_wait()
1. epoll_create用于创建epoll的实例,其中参数size只要大于0即可,内核会动态获取大小,函数返回epoll本身的描述符
int epoll_create(int size);
2. epoll_ctl用于添加,修改,删除要监听的event事件
参数op为EPOLL_CTL_ADD代表添加,EPOLL_CTL_MOD代表修改,EPOLL_CTL_DEL代表删除int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
3. epoll_wait用于监视等待是否有IO事件发生,直到timeout过期
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
epoll中存在两种工作模式 LT 和 ET
二者的差异在于level-trigger (LT) 模式下只要某个 socket 处于 readable/writable 状态,无论什么时候进行 epoll_wait 都会返回该 socket;
edge-trigger (ET) 模式下只有某个 socket 从 unreadable 变为 readable 或从unwritable 变为 writable 时,epoll_wait 才会返回该 socket。
如下两个示意图:
从socket读数据:
从socket写数据:
所以, 在epoll的ET模式下, 正确的读写方式为:
读: 只要可读, 就一直读, 直到返回0, 或者 errno = EAGAIN 写: 只要可写, 就一直写, 直到数据发送完, 或者 errno = EAGAIN/* * epoll_server.c */#include#include #include #include #include #include #include #define BUF_SIZE 100#define EPOLL_SIZE 50void error_handling(char *buf){ fputs(buf, stderr); fputc('\n', stderr); exit(1);}int main(int argc, char *argv[]){ int serv_sock, clnt_sock; struct sockaddr_in serv_adr, clnt_adr; socklen_t adr_sz; int str_len, i; char buf[BUF_SIZE]; struct epoll_event *ep_events; struct epoll_event event; int epfd, event_cnt; /* socket init */ serv_sock = socket(PF_INET, SOCK_STREAM, 0); memset(&serv_adr, 0, sizeof(serv_adr)); serv_adr.sin_family = AF_INET; serv_adr.sin_addr.s_addr = htonl(INADDR_ANY); serv_adr.sin_port = htons(8888); /* socket bind & listen */ if (bind(serv_sock, (struct sockaddr *)&serv_adr, sizeof(serv_adr)) == -1) { error_handling("bind() error"); } if (listen(serv_sock, 5) == -1) { error_handling("listen() error"); } /* epoll create */ epfd = epoll_create(EPOLL_SIZE); ep_events = malloc(sizeof(struct epoll_event) * EPOLL_SIZE); event.events = EPOLLIN; event.data.fd = serv_sock; epoll_ctl(epfd, EPOLL_CTL_ADD, serv_sock, &event); while (1) { /* block and wait */ event_cnt = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1); if (event_cnt == -1) { puts("epoll_wait() error"); break; } /* event trigger */ for (i = 0; i < event_cnt; i++) { /* serv_socket event */ if (ep_events[i].data.fd == serv_sock) { adr_sz = sizeof(clnt_adr); clnt_sock = accept(serv_sock, (struct sockaddr *)&clnt_adr, &adr_sz); /* add client fd to epoll */ event.events = EPOLLIN; event.data.fd = clnt_sock; epoll_ctl(epfd, EPOLL_CTL_ADD, clnt_sock, &event); printf("connected client: %d \n", clnt_sock); } /* other event */ else { memset(buf, '\0', BUF_SIZE); str_len = read(ep_events[i].data.fd, buf, BUF_SIZE); if (str_len == 0) // close request! { epoll_ctl(epfd, EPOLL_CTL_DEL, ep_events[i].data.fd, NULL); close(ep_events[i].data.fd); printf("closed client: %d \n", ep_events[i].data.fd); } else { printf("receive msg: %s\n", buf); write(ep_events[i].data.fd, buf, str_len); // echo! } } } } close(serv_sock); close(epfd); return 0;}
/* * epoll_client.c */#include#include #include #include #include #include #define BUF_SIZE 1024void error_handling(char *message){ fputs(message, stderr); fputc('\n', stderr); exit(1);}int main(int argc, char *argv[]){ int sock; char message[BUF_SIZE]; int str_len; struct sockaddr_in serv_adr; /* check the input */ if (argc != 2) { printf("Usage : %s \n", argv[0]); exit(1); } /* socket */ sock = socket(PF_INET, SOCK_STREAM, 0); if (sock == -1) { error_handling("socket() error"); } /* */ memset(&serv_adr, 0, sizeof(serv_adr)); serv_adr.sin_family = AF_INET; serv_adr.sin_addr.s_addr = inet_addr(argv[1]); serv_adr.sin_port = htons(8888); if (connect(sock, (struct sockaddr *)&serv_adr, sizeof(serv_adr)) == -1) { error_handling("connect() error!"); } else { puts("Connected..........."); } while (1) { /* input the message */ fputs("Input message(Q to quit): ", stdout); fgets(message, BUF_SIZE, stdin); if (!strcmp(message, "q\n") || !strcmp(message, "Q\n")) { break; } /* send the message from socket */ write(sock, message, strlen(message)); /* read the message from socket */ str_len = read(sock, message, BUF_SIZE - 1); message[str_len] = 0; printf("Message from server: %s", message); } close(sock); return 0;}
转载地址:http://slldi.baihongyu.com/