epoll是linux引以为荣的技术,因为相对于select和poll有很大的性能改进。本文主要介绍epoll的实现原理,了解epoll高效背后的魔法。
1. epoll_create
使用epoll时需要使用epoll_create()创建一个epoll的文件句柄,epoll_create()函数的原型如下:
intepoll_create(int size);
此接口用于创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。
2. epoll_ctl
使用epoll_ctl()可以向epoll句柄添加或者删除要监听的文件句柄。epoll_ctl()函数原型如下:
intepoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
要监听文件是否可读写时先要向epoll句柄注册要监听的事件类型。第一个参数是epoll_create()返回的epoll句柄,第二个参数表示动作,用三个宏来表示:
第三个参数是需要监听的文件句柄,第四个参数是告诉内核需要监听什么事。
3. epoll_wait
万事俱备,现在只需要调用epoll_wait()函数就可以开始等待事件发生了。epoll_wailt()函数的原型如下:
intepoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。
前面介绍了epoll的使用,接下来主要介绍epoll在内核的实现原理。
当我们在用户态调用epoll_create()时,会触发调用内核的sys_epoll_create()。我们先来看看sys_epoll_create()这个函数:
SYSCALL_DEFINE1(epoll_create1, int, flags) { int error, fd; structeventpoll *ep = NULL; structfile *file; ... error = ep_alloc(&ep); ... fd = get_unused_fd_flags(O_RDWR| (flags & O_CLOEXEC)); ... file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep, O_RDWR | (flags &O_CLOEXEC)); ... fd_install(fd,file); ep->file = file; return fd; }
这个函数主要调用ep_alloc()申请一个eventpoll结构,eventpoll结构是epoll的核心数据结构,我们来看看这个结构的定义:
structeventpoll { spinlock_t lock; structmutexmtx; wait_queue_head_t wq; wait_queue_head_t poll_wait; structlist_headrdllist; structrb_rootrbr; structepitem *ovflist; structuser_struct *user; structfile *file; intvisited; structlist_headvisited_list_link; };
eventpoll结构有三个字段是比较重要的,分别是:wq、rdllist和rbr。
创建完eventpoll结构后,sys_epoll_create()会调用get_unused_fd_flags()获取一个空闲的文件句柄fd,接着调anon_inode_getfile()获取一个空闲的file结构,并且把eventpoll结构与file结构绑定。最后调用fd_install()把文件句柄fd与file结构绑定,返回文件句柄fd。通过一系列的操作后,内核就可以通过文件句柄fd与eventpoll结构进行关联。
根据epoll的使用流程,使用epoll_create()创建epoll句柄后,可以通过epoll_ctl()函数向epoll句柄添加和删除要监视的文件句柄。调用epoll_ctl()会触发内核调用sys_epoll_ctl()函数,我们来看看sys_epoll_ctl()函数的最重要部分:
SYSCALL_DEFINE4(epoll_ctl,int, epfd, int, op, int, fd, struct epoll_event __user *,event) { ... switch (op) { case EPOLL_CTL_ADD: if (!epi) { epds.events |= POLLERR | POLLHUP; error = ep_insert(ep, &epds,tfile, fd); } else error = -EEXIST; clear_tfile_check_list(); break; case EPOLL_CTL_DEL: if (epi) error = ep_remove(ep, epi); else error = -ENOENT; break; case EPOLL_CTL_MOD: if (epi) { epds.events |= POLLERR | POLLHUP; error = ep_modify(ep, epi,&epds); } else error = -ENOENT; break; } ... return error; }
sys_epoll_ctl()会根据我们传递的op参数来进行不同的操作,我们主要看看op为EPOLL_CTL_ADD的操作,也就是添加操作。当进行添加操作时,sys_epoll_ctl()最终会调用ep_insert()把文件句柄fd添加到eventpoll结构维护的红黑树中,ep_insert()代码如下:
static int ep_insert(struct eventpoll *ep, struct epoll_event*event, struct file *tfile, int fd) { int error, revents, pwake =0; unsignedlong flags; structepitem *epi; structep_pqueue epq; ... if (!(epi =kmem_cache_alloc(epi_cache, GFP_KERNEL))) return -ENOMEM; ... init_poll_funcptr(&epq.pt,ep_ptable_queue_proc); revents = tfile->f_op->poll(tfile,&epq.pt); ... ep_rbtree_insert(ep, epi); ... return 0; }
ep_insert()函数首先创建一个epitem结构用于管理事件,然后调用文件句柄的poll()接口,根据init_poll_funcptr(&epq.pt,ep_ptable_queue_proc)这行代码的设置,poll()接口最终会调用ep_ptable_queue_proc()函数。ep_ptable_queue_proc()函数代码如下:
static void ep_ptable_queue_proc(struct file *file,wait_queue_head_t *whead, poll_table*pt) { structepitem *epi = ep_item_from_epqueue(pt); structeppoll_entry *pwq; if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) { init_waitqueue_func_entry(&pwq->wait, ep_poll_callback); pwq->whead = whead; pwq->base = epi; add_wait_queue(whead, &pwq->wait); list_add_tail(&pwq->llink, &epi->pwqlist); epi->nwait++; } else { epi->nwait = -1; } }
ep_ptable_queue_proc()函数的作用就是把epitem结构添加到文件的等待队列中,根据上面的代码可知,当等待队列被唤醒的时候,将会调用ep_poll_callback()函数。而ep_poll_callback()函数的作用就是把epitem结构放置到eventpoll结构的rdllist队列中。我们前面分析过,rdllist就是就绪的文件队列。ep_poll_callback()函数最终会调用wake_up_locked(&ep->wq)唤醒进程。简化后的代码如下:
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key) { ... // 把epitem添加到rdllist队列中 if(!ep_is_linked(&epi->rdllink)) list_add_tail(&epi->rdllink,&ep->rdllist); // 唤醒进程 if (waitqueue_active(&ep->wq)) wake_up_locked(&ep->wq); ... return 1; }
ep_insert()函数最后一个操作就是调用ep_rbtree_insert()把epitem结构添加的eventpoll结构的红黑树中。如下图:
使用红黑树管理epitem结构的目的是可以根据文件句柄fd快速查找到对应的epitem结构。红黑树是一棵平衡二叉树,时间复杂度为O(logN)。
添加文件句柄到epoll之后,就可以调用epoll_wait()函数开始监听文件。epoll_wait()会调用内核的sys_epoll_wait()函数,而sys_epoll_wait()最终会调用ep_poll()函数,代码如下:
static int ep_poll(struct eventpoll *ep, struct epoll_event __user*events, int maxevents, long timeout) { ... if (list_empty(&ep->rdllist)) { init_waitqueue_entry(&wait,current); wait.flags |= WQ_FLAG_EXCLUSIVE; __add_wait_queue(&ep->wq,&wait); for (;;) { set_current_state(TASK_INTERRUPTIBLE); if (!list_empty(&ep->rdllist)|| !jtimeout) break; if (signal_pending(current)) { res = -EINTR; break; } spin_unlock_irqrestore(&ep->lock, flags); jtimeout = schedule_timeout(jtimeout); spin_lock_irqsave(&ep->lock,flags); } __remove_wait_queue(&ep->wq,&wait); set_current_state(TASK_RUNNING); } ... if (!res && eavail && !(res = ep_send_events(ep, events, maxevents))&& jtimeout) goto retry; return res; }
ep_poll()函数所做的事情很简单,就是把当前进程设置为可中断睡眠状态,然后添加eventpoll结构的等待队列中,最后调用schedule_timeout()让出CPU。这样当前进程就会进入睡眠状态,当进程醒来的时候会判断eventpoll结构的rdllist队列是否为空,然后不为空就调用ep_send_events()函数把可读写的文件拷贝到用户态的events数组中。
那么什么时候当前进程会被唤醒呢?在分析ep_insert()函数的时候,我们提及过当文件状态发生改变时会调用ep_poll_callback()函数,而ep_poll_callback()函数会把就绪的文件添加到rdllist队列,并且就会把当前进程唤醒。
本文主要分析了epoll的实现原理,可以知道,epoll并不会对所有文件进行扫描(而select和poll会对所以文件进行扫描),而是使用事件的方式把就绪的文件收集起来,所以epoll的效率非常高。
当然本文还有一些epoll的细节并没有介绍到,例如水平触发和边缘触发等,有兴趣可以自己研究代码。