Kqueue和其他的多路复用IO的核心是,单消费者同时监听不同种类的生产者,从而提供高性能的单线程IO,减少调度开销。而Kqueue通过在内核态维持状态提供了更高的性能。
单Producer和单Consumer
生产者/消费者模型是常见的通信模型,通过共享内核缓冲区环形队列,实现异步的事件通知。双方只关注缓冲区内的数据,而不关注彼此,因此常常被用于网络通信。
信号量
为了避免消费者在缓存区未满时无意义的轮询,消费者block直到生产者通知。wait时线程设置信号量并且block,notify时内核通知所有等待信号的线程状态改为RUNNABLE。
事实上就是linux的pthread_cond_wait和phread_cond_signal原语。consumer之所以要带锁wait,是因为在内部进行调度yield_wait前要放掉锁,否则其他线程无法进入临界区;唤醒之后重新获得锁。(这里指的锁是外部事务的锁)
wait和notify需要增加锁,防止notify先于wait进行。(这里的锁指的是内部事务的锁)
wait调用的yield_wait在调度时需要临时释放并随后获取内部事务锁,否则会阻塞其他的notify造成全员block。
send(bb, msg):
acquire(bb.lock)
while True:
if bb.in - bb.out < N:
bb.buf[bb.in mod N] <- msg
bb.in <- bb.in + 1
release(bb.lock)
notify(bb.not_empty)
return
wait(bb.not_full, bb.block)
receive(bb):
acquire(bb.lock)
while True:
if bb.in > bb.out:
msg <- bb.buf[bb.out mod N]
bb.out <- bb.out + 1
release(bb.lock)
wait(bb.not_full)
return
wait(bb.not_empty, bb.block)
Eventcount & Sequencer
这是1979年提出的算法,作为信号量的可替换实现。Sequencer的目的是处理多producer。
semaphores
send(Buffer& buffer,Message msg) {
t=TICKET(T);
AWAIT(buffer.in, t);
AWAIT(buffer.out, READ(buffer.in)-N);
buffer[READ(buffer.in)%N]=msg;
ADVANCE(in);
}
receive(Buffer& buffer) {
AWAIT(buffer.in, READ(buffer.out));
msg = buffer[READ(buffer.out)%N];
ADVANCE(buffer.out);
return msg;
}
send等待in超过ticket,相当于拿排队锁轮到自己。然后等待缓存区未满时写入数据。
receive等待缓冲区存在数据时读取数据。
https://people.freebsd.org/~jlemon/papers/kqueue.pdf
问题在于,上面提到的做法本质上都是监听着一个事件,如果我们想要处理多个监听事件,操作系统必须提供新的原语,例如每个socket都对应着一个file descriptor,需要同时监听所有socket的事件。BSD的Kqueue和Linux的epoll都是解决这种问题的方式,本质上它们就是IPC,但是单纯从IO的角度看叫做多路复用IO。目前epoll用于netty的底层,是单线程实现高性能网络的关键。
传统的select和poll仅仅适用于file descriptor,但是无法关注其他IPC机制,例如信号、文件系统变化、异步IO完成、进程存在;并且也不具备scalability。
第一个问题在于参数传递,每次都必须传递整个事件组,并且动态在内核中创建和销毁内存。第二个问题在于内核必须遍历整个fd列表去找活跃的fd。初始遍历一次确定没有active的fd才能沉睡,如果没有active还要再遍历一次设定回调来唤醒,最后唤醒时还要再遍历一次来看是哪个fd唤醒了。
问题出在这个syscall无状态上,无法利用之前的信息,每次都得重新计算。因此Kqueue的机制就在于内核中维持一个队列储存状态。
int
kqueue(void);
int
kevent(int kq,const struct kevent *changelist, int nchanges,
struct kevent *eventlist, int nevents,
const struct timespec *timeout);
struct kevent{
uintpt t ident; // 事件关注对象的ID,kq,ident,filter确定唯一的event
// 事件类型,ident,fflags,data应该如何被解释?
u short flags; // 输入: 增加/减少,使能/禁止, 执行后重置/删除;输出: 发生EOF或者ERROR
u int fflags; // 活跃时应该怎么做,是否返回event?
intptr t data; // filter和fflags规定的数据传输方式
void *udata; // 自定义的数据传输方式
__uint64_t ext[4]; //在末尾增加的额外信息Hint
}
EV_SET(&kev, ident, filter, flags, fflags, data, udata);
kevent()用于创建kqueue并且返回对应的capability(权限控制的抽象)。
kevent()用于注册event,并设定超时,changelist是指kqueue注册的event如何变化,eventlist则是返回的event。当event触发时,会调用内核的回调函数,通知进程。
filter
NOTE DELETE
NOTE WRITE
NOTE EXTEND
NOTE ATTRIB
NOTE LINK
NOTE RENAME
NOTE EXIT/FORK/EXEC 监听exit,fork,execve等原语
NOTE TRACK 若父进程设定为Track则fork后子进程为CHILD
输出:
NOTE CHILD 子进程fork后设定child,并且父进程id存入data
NOTE TRACKERR 无法添加子进程事件,通常因为资源限制
sample
handle_events()
{
int i, n;
struct timespec timeout =
{ TMOUT_SEC, TMOUT_NSEC };
n = kevent(kq, ch, nchanges,
evi, nevents, &timeout);
if (n <= 0)
goto error_or_timeout;
for (i = 0; i < n; i++) {
if (evi.flags & EV_ERROR)
/* error */
if (evi.filter == EVFILT_READ)
readable_fd(evi.ident);
if (evi.filter == EVFILT_WRITE)
writeable_fd(evi.ident);
}
...
}
update_fd(int fd, int action,int filter)
{
EV_SET(&chnchanges, fd, filter,action == ADD ?
EV_ADD : EV_DELETE,
0, 0, 0);
nchanges++;
}
Knote
struct knote {
SLIST_ENTRY(knote) kn_link; /* for kq */
SLIST_ENTRY(knote) kn_selnext; /* for struct selinfo */
struct knlist *kn_knlist; /* f_attach populated */
TAILQ_ENTRY(knote) kn_tqe;
struct kqueue *kn_kq; /* which queue we are on */
struct kevent kn_kevent;
void *kn_hook;
int kn_hookid;
int kn_status; /* protected by kq lock */
#define KN_ACTIVE 0x01 /* event has been triggered */
#define KN_QUEUED 0x02 /* event is on queue */
#define KN_DISABLED 0x04 /* event is disabled */
#define KN_DETACHED 0x08 /* knote is detached */
#define KN_MARKER 0x20 /* ignore this knote */
#define KN_KQUEUE 0x40 /* this knote belongs to a kq */
#define KN_SCAN 0x100 /* flux set in kqueue_scan() */
int kn_influx;
int kn_sfflags; /* saved filter flags */
int64_t kn_sdata; /* saved data field */
union {
struct file *p_fp; /* file data pointer */
struct proc *p_proc; /* proc pointer */
struct kaiocb *p_aio; /* AIO job pointer */
struct aioliojob *p_lio; /* LIO job pointer */
void *p_v; /* generic other pointer */
} kn_ptr;
struct filterops *kn_fop;
#define kn_id kn_kevent.ident
#define kn_filter kn_kevent.filter
#define kn_flags kn_kevent.flags
#define kn_fflags kn_kevent.fflags
#define kn_data kn_kevent.data
#define kn_fp kn_ptr.p_fp
};
Kqueue
struct kqueue {
struct mtx kq_lock;
int kq_refcnt;
TAILQ_ENTRY(kqueue) kq_list;
TAILQ_HEAD(, knote) kq_head; /* list of pending event */
int kq_count; /* number of pending events */
struct selinfo kq_sel;
struct sigio *kq_sigio;
struct filedesc *kq_fdp;
int kq_state;
#define KQ_SEL 0x01
#define KQ_SLEEP 0x02
#define KQ_FLUXWAIT 0x04 /* waiting for a in flux kn */
#define KQ_ASYNC 0x08
#define KQ_CLOSING 0x10
#define KQ_TASKSCHED 0x20 /* task scheduled */
#define KQ_TASKDRAIN 0x40 /* waiting for task to drain */
int kq_knlistsize; /* size of knlist */
struct klist *kq_knlist; /* list of knotes */
u_long kq_knhashmask; /* size of knhash */
struct klist *kq_knhash; /* hash table for knotes */
struct task kq_task;
struct ucred *kq_cred;
};
Registration
kqueue
kqueue本身作为文件抽象看待,在OFT里注册entry创建内核对象并赋予descriptor索引。hash和内部的array并不分配。
kevent
int
kevent(int kq, const struct kevent *changelist, int nchanges,
struct kevent *eventlist, int nevents, const struct timespec *timeout)
{
return (((int (*)(int, const struct kevent *, int,
struct kevent *, int, const struct timespec *))
__libc_interposing[INTERPOS_kevent])(kq, changelist, nchanges,
eventlist, nevents, timeout));
}
这里调用了kqueue_register来对changeList进行注册。首先根据线程和fd获取文件的FCB,kq对于fp引用计数++,然后调用实际的注册函数。注册的代码太长了,大体就是先根据<Iden,filter>寻找knote节点,找不到如果是EV_ADD则增加knote,否则把事件增加到knote上去。
int
kqfd_register(int fd, struct kevent *kev, struct thread *td, int mflag)
{
struct kqueue *kq;
struct file *fp;
cap_rights_t rights;
int error;
error = fget(td, fd, cap_rights_init(&rights, CAP_KQUEUE_CHANGE), &fp);
if (error != 0)
return (error);
if ((error = kqueue_acquire(fp, &kq)) != 0)
goto noacquire;
error = kqueue_register(kq, kev, td, mflag);
kqueue_release(kq, 0);
noacquire:
fdrop(fp, td);
return (error);
}
Filter
filter的作用就是对于事件源进行过滤,事件源所有的活动都会调用filter,但是只有符合filter规则的事件才会报告给应用,也就是返回布尔值,同时他也会修改fflags和data产生副作用(上面提到的输出参数)。filter封装了事件,kqueue只能询问他是否活跃,而对事件的细节一无所知。因此只需要增加filter,就能拓展事件的内容。
Activity
在所有触发这些活动的地方插入hook函数,调用knote()函数遍历自己维护的klist(注册的时候维护的),调用filter。
如果事件触发则激活,通过knote找到其所属的kqueue,并且将knote加入kqueue的active链末尾。如果已经在了,那么不用增加knote,但是filter还是会记录activity(e.g.上文提到的副作用)。
这里有些special case,例如fork需要看是不是TRACK,来判断是否报告子节点的PID
Additionally, for each knote attached to the parent, check whether user wants to track the new process. If so, attach a new knote to it, and immediately report an event with the child's pid.
首先,激活父进程的knote,然后创建新的knote分配给子节点,并且设置CHILD flag和对应的父进程PID。同时这里还提到了可能存在事件可能改变data,因此为EXIT额外分配一个节点。
/*
* Activate existing knote and register tracking knotes with
* new process.
*
* First register a knote to get just the child notice. This
* must be a separate note from a potential NOTE_EXIT
* notification since both NOTE_CHILD and NOTE_EXIT are defined
* to use the data field (in conflicting ways).
*/
kev.ident = pid;
kev.filter = kn->kn_filter;
kev.flags = kn->kn_flags | EV_ADD | EV_ENABLE | EV_ONESHOT |
EV_FLAG2;
kev.fflags = kn->kn_sfflags;
kev.data = kn->kn_id; /* parent */
kev.udata = kn->kn_kevent.udata;/* preserve udata */
error = kqueue_register(kq, &kev, NULL, M_NOWAIT);
if (error)
kn->kn_fflags |= NOTE_TRACKERR;
/*
* Then register another knote to track other potential events
* from the new process.
*/
kev.ident = pid;
kev.filter = kn->kn_filter;
kev.flags = kn->kn_flags | EV_ADD | EV_ENABLE | EV_FLAG1;
kev.fflags = kn->kn_sfflags;
kev.data = kn->kn_id; /* parent */
kev.udata = kn->kn_kevent.udata;/* preserve udata */
error = kqueue_register(kq, &kev, NULL, M_NOWAIT);
if (error)
kn->kn_fflags |= NOTE_TRACKERR;
if (kn->kn_fop->f_event(kn, NOTE_FORK))
KNOTE_ACTIVATE(kn, 0);
list->kl_lock(list->kl_lockarg);
KQ_LOCK(kq);
kn_leave_flux(kn);
KQ_UNLOCK_FLUX(kq);
Delivery
kqueue_scan在active链末尾加入哨兵,如果scan时扔出了哨兵,那么遍历结束。
每次都从active移除一个节点(注意检查timeout,过期也要移除,DISABLE也是在这里移除),如果不是ONESHOP,那么filter带着query hint重新检查一遍是否激活,防止途中又被修改。
The rationale for this is the case where data arrives for a socket, which causes the knote to be queued, but the Application happens to call read() and empty the socket buffer before calling kevent. If the knote was still queued, then an event would be returned telling the application to read an empty buffer.
确认激活的knote的信息将会拷贝到kevnet通过eventlist返回给应用进行通知。如果ONESHOP则直接从kqueue中移除,否则如果filter看它仍然active,就把它重新放到active链末尾(上次扫描的哨兵之后)。直到哨兵被出列,scan完成。
Miscellaneous Notes
1.论文的版本fork的时候不复制kqueue的df除非vfork。如果复制的话需要在fork时进行整个kqueue复制或者标记为COW。(现在不知道是不是这么做的)
2.kqueue是通过维护klist来对整条链涉及的所有进程进行通知的,而不是像poll或者select那样在sellInfo持有pid。下面这段话看不懂了,没看过poll不知道啥叫collision。
While this may be a natural outcome from the way knotes are implemented, it also means that the kqueue system is not susceptible to select collisions. As each knote is queued in the active list, only processes sleeping on that kqueue are woken up
3.考虑同一个klist有不同类型的filter,调用knode时应该给予额外信息通知他到底是什么事件触发的(例如PROC和SIGNAL容易混淆),因此利用hint确定activity和哪个相关
4. kevent要经历两次拷贝,增加了overhead。因此如果采用AIO更好,kernel直接修改user状态下的control block。那么为什么不这么做呢?根本原因在于如果内核不允许直接写用户态数据的话,bug会更好定位,同时应用也不需要考虑状态。
精妙之处在于kqueue维持在内核中,因此socket如果满了可以直接将knote加入进程kqueue的活跃链,而不需要等到下次syscall的时候再检查。例如,即使我长期不kevent,knote()依然会将他们的activity存储在knote上并且插入active list,下次只需要遍历active list而不需要重头遍历整个queue。
同时因为kqueue有状态,进行修改也开销很小,只需要改变变化的那部分就行了。
看的时候还是有些地方比较难理解,加上源代码也很复杂,如果有纠错请指正。
filechange
struct kevent ev;
struct timespec nullts = { 0, 0 };
EV_SET(&ev, fd, EVFILT_VNODE,
EV_ADD | EV_ENABLE | EV_CLEAR,
NOTE_RENAME | NOTE_WRITE |
NOTE_DELETE | NOTE_ATTRIB, 0, 0);
kevent(kq, &ev, 1, NULL, 0, &nullts);
for (;;) {
n = kevent(kq, NULL, 0, &ev, 1, NULL);
if (n > 0) {
printf("The file was");
if (ev.fflags & NOTE_RENAME)
printf(" renamed");
if (ev.fflags & NOTE_WRITE)
printf(" written");
if (ev.fflags & NOTE_DELETE)
printf(" deleted");
if (ev.fflags & NOTE_ATTRIB)
printf(" chmod/chowned");
printf("n");
}
signal
struct kevent ev;
struct timespec nullts = { 0, 0 };
EV_SET(&ev, SIGHUP, EVFILT_SIGNAL,
EV_ADD | EV_ENABLE, 0, 0, 0);
kevent(kq, &ev, 1, NULL, 0, &nullts);
signal(SIGHUP, SIG_IGN);
for (;;) {
n = kevent(kq, NULL, 0, &ev, 1, NULL);
if (n > 0)
printf("signal %d delivered"
" %d timesn",
ev.ident, ev.data);
}
udata
int i, n;
struct timespec timeout =
{ TMOUT_SEC, TMOUT_NSEC };
void (* fcn)(struct kevent *);
n = kevent(kq, ch, nchanges,
ev, nevents, &timeout);
if (n <= 0)
goto error_or_timeout;
for (i = 0; i < n; i++) {
if (evi.flags & EV_ERROR)
/* error */
fcn = evi.udata;
fcn(&evi);
}