在网络编程系列文章中,我们实现了一个基于epoll的网络框架,并在此基础上开发了一个简单的HTTP服务,在那个系列文章中我们使用了读、写两个buffer将网络IO和数据的读写进行了分离,它们之间的扭转完全通过epoll事件通知,如果你认真研究过源码,会发现,所有针对网络IO的操作都是由事件触发的。这种基于事件触发的网络模型通常我们叫做Reactor网络模型。
由于网络编程系列文章中代码实现相对比较复杂,不太好讲清楚。所以,我决定单独出几篇文章对那个系列文章进行一些拓展,主要涉及到网络编程思想和性能测试。
这篇文章我们通过实现一个简单的网络框架,来说明Reactor网络模型实现的一般思路,其本质思想和x.NET项目基本上是一样的,只是在代码上做了非常大的精简,理解起来会轻松很多。
#include <sys/socket.h>
#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/epoll.h>
int mAIn() {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(struct sockaddr_in));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(2048);
if (-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) {
perror("bind fail");
return -1;
}
listen(sockfd, 10);
printf("sock-fd:%dn", sockfd);
int epfd = epoll_create(1);
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = sockfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);
struct epoll_event events[1024] = {0};
while(1) {
int nready = epoll_wait(epfd, events, 1024, -1);
int i = 0;
for (i = 0; i < nready; i++) {
int connfd = events[i].data.fd;
if (events[i].events & EPOLLIN && sockfd == connfd) {
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = clientfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);
printf("clientfd: %dn", clientfd);
} else if (events[i].events & EPOLLIN) {
char buffer[10] = {0};
int count = recv(connfd, buffer, 10, 0);
if (count == 0) {
printf("discounnectn");
epoll_ctl(epfd, EPOLL_CTL_DEL, connfd, NULL);
close(i);
continue;
}
send(connfd, buffer, count, 0);
printf("clientfd: %d, count: %d, buffer: %sn", connfd, count, buffer);
}
}
}
}
熟悉epoll的人应该对上面的代码比较熟悉,这段代码的核心在下面的while主循环,如果是当前Server的Socket说明有新的连接进来,调用accept拿到客户端的fd,将其放在epoll的events中,并注册EPOLLIN事件,一般我们理解为可读事件。
如果不是sockfd,说明是客户端的fd可读,我们将数据读出来再原样发送回去。
上面的代码存在的主要问题在于,套接字的accept和读写操作我们是直接写在主循环里了,这将会让代码的逻辑变得难以琢磨。
int recv_callback(int fd, char *buffer, int size);
int send_callback(int fd, char *buffer, int size);
int recv_callback(int fd, char *buffer, int size) {
int count = recv(fd, buffer, size, 0);
send_callback(fd, buffer, count, 0);
return count;
}
int send_callback(int fd, char *buffer, int size) {
int count = send(fd, buffer, size, 0);
return count;
}
int main() {
...
while(1) {
int nready = epoll_wait(epfd, events, 1024, -1);
int i = 0;
for (i = 0; i < nready; i++) {
int connfd = events[i].data.fd;
if (events[i].events & EPOLLIN && sockfd == connfd) {
...
} else if (events[i].events & EPOLLIN) {
char buffer[10] = {0};
int count = recv_callback(fd, buffer, 10);
if (count == 0) {
printf("disconnect\n");
epoll_ctl(epfd, EPOLL_CTL_DEL, connfd, NULL);
clise(i);
continue;
}
}
}
}
}
int recv_callback(int fd, char *buffer, int size) {
int count = recv(fd, buffer, size, 0);
return count;
}
int send_callback(int fd, char *buffer, int size) {
int count = send(fd, buffer, size, 0);
return count;
}
但这样明显也是有问题的,在recv_callback中读完了之后,如何发送数据呢?这里,我们可以想一下,围绕着一个套接字都有哪些部分呢?是不是可以设计出一个类似字典的结构,这个字典的key对应的就是套接字,而value对应的就是围绕套接字相关的各个组件。
#define BUF_LEN 1024
typedef int(*callback)(int fd);
struct conn_channel {
int fd;
callback recv_call;
callback send_call;
char wbuf[BUF_LEN];
int wlen;
char rbuf[BUF_LEN];
int rlen;
};
struct conn_channel conn_map[1024] = {0};
int main() {
...
while(1) {
int nready = epoll_wait(epfd, events, 1024, -1);
int i = 0;
for (i = 0; i < nready; i++) {
int connfd = events[i].data.fd;
if (events[i].events & EPOLLIN && sockfd == connfd) {
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
ev.events = EPOLLIN;
ev.data.fd = clientaddr;
epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);
conn_map[clientfd].fd = clientfd;
conn_map[clientfd].rlen = 0;
conn_map[clientfd].wlen = 0;
conn_map[clientfd].recv_call = recv_callback;
conn_map[clientfd].send_call = send_callback;
memset(conn_map[clientfd].rbuf, 0, BUF_LEN);
memset(conn_map[clientfd].wbuf, 0, BUF_LEN);
printf("clientfd:%d\n", clientfd);
} else if (events[i].events & EPOLLIN) {
...
}
}
}
}
int recv_callback(int fd) {
int count = recv(fd, conn_map[fd].rbuf + conn_map[fd].rlen, BUF_LEN - conn_map[fd].rlen, 0);
// do something
memcpy(conn_map[fd].wbuf, conn_map[fd].rbuf, conn_map[fd].rlen);
conn_map[fd].wlen = conn_map[fd].rlen;
conn_map[fd].rlen = 0;
return count;
}
int send_callback(int fd) {
int count = send(fd, conn_map[fd].wbuffer, conn_map[fd].wlen, 0);
return count;
}
因为有了conn_map,所以原来传进来的buffer和size都不需要了,在conn_channel中已经有记录了。所以只需要一个fd参数就可以了。我们在recv_callback中模拟了回复消息,强行将读到的数据写到了wbuffer中。这里补充一下,conn_channel中的rbuffer是用来从套接字中读数据的,wbuffer表示的是将要发送到套接字的数据。
你可以试着把上面的代码跑起来,然后你会发现,并没有按我们的预期执行,send_callback中的send似乎没有起作用。这是因为我们只是将数据从rbuffer写到了wbuffer中,而send_callback并没有机会调用。你可以想一想send_callback放在哪里调用比较合适呢?
在上面的例子中,显然放在主循环中执行比较合适,在epoll中,EPOLLOUT表示可写事件,我们可以利用这个事件。在recv_callback执行完之后我们注册一个EPOLLOUT事件,然后在主循环中我们去监听EPOLLOUT事件。这样,当recv_callback将rbuffer的数据复制到wbuffer中之后,send_callback通过EPOLLOUT事件就可以在主循环中得以执行。
int recv_callback(int fd) {
int count = recv(fd, conn_map[fd].rbuf + conn_map[fd].rlen, BUF_LEN - conn_map[fd].rlen, 0);
// do something
memcpy(conn_map[fd].wbuf, conn_map[fd].rbuf, conn_map[fd].rlen);
conn_map[fd].wlen = conn_map[fd].rlen;
conn_map[fd].rlen = 0;
struct epoll_event ev;
ev.events = EPOLLOUT;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
return count;
}
int main() {
...
while(1) {
int nready = epoll_wait(epfd, events, 1024, -1);
int i = 0;
for (i = 0; i < nready; i++) {
int connfd = events[i].data.fd;
if (events[i].events & EPOLLIN && sockfd == connfd) {
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
ev.events = EPOLLIN;
ev.data.fd = clientaddr;
epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);
conn_map[clientfd].fd = clientfd;
conn_map[clientfd].rlen = 0;
conn_map[clientfd].wlen = 0;
conn_map[clientfd].recv_call = recv_callback;
conn_map[clientfd].send_call = send_callback;
memset(conn_map[clientfd].rbuf, 0, BUF_LEN);
memset(conn_map[clientfd].wbuf, 0, BUF_LEN);
printf("clientfd:%d\n", clientfd);
} else if (events[i].events & EPOLLIN) {
int count = conn_map[connfd].recv_call(connfd);
printf("recv-count:%d\n", count);
} else if (events[i].events & EPOLLOUT) { // 处理EPOLLOUT事件
int count = conn_map[connfd].send_call(connfd);
printf("send-count:%d\n", count);
}
}
}
}
要注意的是,epfd是在main函数中定义的,而我们在recv_callback中有使用,所以我们可以暂时将epfd声明成一个全局变量,放在外面。
int send_callback(int fd) {
int count = send(fd, conn_map[fd].wbuffer, conn_map[fd].wlen, 0);
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
return count;
}
这样,我们就将IO操作给屏蔽了,在主循环中我们只关注事件,不同的事件调用不同的回调函数。在对应的回调函数中只做自己该做的,做完之后注册事件通知其它的回调函数。
int accept_callback(int fd) {
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
ev.events = EPOLLIN;
ev.data.fd = clientaddr;
epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);
conn_map[clientfd].fd = clientfd;
conn_map[clientfd].rlen = 0;
conn_map[clientfd].wlen = 0;
conn_map[clientfd].recv_call = recv_callback;
conn_map[clientfd].send_call = send_callback;
memset(conn_map[clientfd].rbuf, 0, BUF_LEN);
memset(conn_map[clientfd].wbuf, 0, BUF_LEN);
return clientfd;
}
struct conn_channel {
int fd;
union {
callback accept_call;
callback recv_call;
} call_t;
callback send_call;
char wbuf[BUF_LEN];
int wlen;
char rbuf[BUF_LEN];
int rlen;
};
int main() {
int sockfd = create_serv(9000);
if (sockfd == -1) {
perror("create-server-fail");
return -1;
}
make_nonblocking(sockfd);
epfd = epoll_create1(1);
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = sockfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);
struct epoll_event events[1024] = {0};
conn_map[sockfd].rlen = 0;
conn_map[sockfd].wlen = 0;
conn_map[sockfd].fd = sockfd;
conn_map[sockfd].call_t.accept_call = accept_callback;
conn_map[sodkfd].send_call = send_callback;
memset(conn_map[sockfd].rbuf, 0, BUF_LEN);
memset(conn_map[sockfd].wbuf, 0, BUF_LEN);
while(1) {
int nready = epoll_wait(epfd, events, 1024, -1);
int i = 0;
for (i = 0; i < nready; i++) {
int connfd = events[i].data.fd;
if (events[i].events & EPOLLIN) {
int count = conn_map[connfd].call_t.recv_call(connfd);
printf("recv-count:%d\n", count);
} else if (events[i].events & EPOLLOUT) {
int count = conn_map[connfd].send_call(connfd);
printf("send-count:%d\n", count);
}
}
}
}
你可以想一下,我们注册的是call_t.accept_call,但在调用的时候确是call_t.recv_call,为什么这样可行?
我们在网络编程系列文章中,单独为accept抽象出了一个对象,你可以对比一下这两种实现方式,看看它们有什么区别?在系列文件中我们为什么要单独抽象出一个accepter对象呢?
可以看到,最后主循环中的逻辑,只有两个分支,这两个分支代表了两种事件,这种通过事件驱动的网络模型便是Reactor网络模型。本文为了容易理解,将代码进行了精简。在实际的工程中我们还要考虑诸多情况。比如,上面的代码只支持epoll,我们是不是可以将事件驱动相关的代码抽象成单独的组件,让其可以支持其它的事件模型。
本文虽然代码简单,但Reactor网络模型的实现基本上都逃脱不了这个套路,只是在此基础上可能会将各个部分进行单独的封装,比如我们在网络编程系列文章中就将channel和map进行了抽象,让它能适配各种场景。
总结
reactor网络模型是网络编程中非常重要的一种编程思想,本文通过一个简短的示例试图讲明白reactor网络编程模型的核心思想。当然,本文的实现还不是很完善,比如在调用回调函数的时候还是传入了fd,我们是否可以不需要这个参数,彻彻底底地和IO进行分离呢?