您当前的位置:首页 > 电脑百科 > 程序开发 > 编程百科

linux异步IO编程实例分析

时间:2020-09-23 16:07:34  来源:  作者:
linux异步IO编程实例分析

异步阻塞IO模型的流程图

在Direct IO模式下,异步是非常有必要的(因为绕过了pagecache,直接和磁盘交互)。linux Native AIO正是基于这种场景设计的,具体的介绍见:KernelAsynchronousI/O (AIO)SupportforLinux。下面我们就来分析一下AIO编程的相关知识。

阻塞模式下的IO过程如下:

int fd = open(const char *pathname, int flags, mode_t mode);
ssize_t pread(int fd, void *buf, size_t count, off_t offset);
ssize_t pwrite(int fd, const void *buf, size_t count, off_t offset);
int close(int fd);

因为整个过程会等待read/write的返回,所以不需要任何额外的数据结构。但异步IO的思想是:应用程序不能阻塞在昂贵的系统调用上让CPU睡大觉,而是将IO操作抽象成一个个的任务单元提交给内核,内核完成IO任务后将结果放在应用程序可以取到的地方。这样在底层做I/O的这段时间内,CPU可以去干其他的计算任务。但异步的IO任务批量的提交和完成,必须有自身可描述的结构,最重要的两个就是iocb和io_event。

libaio中的structs

struct iocb {
        void     *data;  /* Return in the io completion event */
        unsigned key;   /*r use in identifying io requests */
        short           aio_lio_opcode;        short           aio_reqprio;        int             aio_fildes;
        union {
                struct io_iocb_common           c;
                struct io_iocb_vector           v;
                struct io_iocb_poll             poll;
                struct io_iocb_sockaddr saddr;
        } u;};struct io_iocb_common {
        void            *buf;
        unsigned long   nbytes;
        long long       offset;
        unsigned        flags;
        unsigned        resfd;
};

iocb是提交IO任务时用到的,可以完整地描述一个IO请求:

data是留给用来自定义的指针:可以设置为IO完成后的callback函数;

aio_lio_opcode表示操作的类型:IO_CMD_PWRITE | IO_CMD_PREAD;

aio_fildes是要操作的文件:fd;

io_iocb_common中的buf, nbytes, offset分别记录的IO请求的mem buffer,大小和偏移。

struct io_event {
        void *data;
        struct iocb *obj;
        unsigned long res;
        unsigned long res2;
};

io_event是用来描述返回结果的:

obj就是之前提交IO任务时的iocb;

res和res2来表示IO任务完成的状态。

libaio提供的API和完成IO的过程

libaio提供的API有:io_setup, io_submit, io_getevents, io_destroy。

1. 建立IO任务

int io_setup (int maxevents, io_context_t *ctxp);

io_context_t对应内核中一个结构,为异步IO请求提供上下文环境。注意在setup前必须将io_context_t初始化为0。

当然,这里也需要open需要操作的文件,注意设置O_DIRECT标志。

2.提交IO任务

long io_submit (aio_context_t ctx_id, long nr, struct iocb **iocbpp);

提交任务之前必须先填充iocb结构体,libaio提供的包装函数说明了需要完成的工作:

void io_prep_pread(struct iocb *iocb, int fd, void *buf, size_t count, long long offset)
{        memset(iocb, 0, sizeof(*iocb));
        iocb->aio_fildes = fd;        iocb->aio_lio_opcode = IO_CMD_PREAD;        iocb->aio_reqprio = 0;
        iocb->u.c.buf = buf;        iocb->u.c.nbytes = count;        iocb->u.c.offset = offset;}void io_prep_pwrite(struct iocb *iocb, int fd, void *buf, size_t count, long long offset)
{        memset(iocb, 0, sizeof(*iocb));
        iocb->aio_fildes = fd;        iocb->aio_lio_opcode = IO_CMD_PWRITE;        iocb->aio_reqprio = 0;
        iocb->u.c.buf = buf;        iocb->u.c.nbytes = count;        iocb->u.c.offset = offset;}

这里注意读写的buf都必须是按扇区对齐的,可以用posix_memalign来分配。

3.获取完成的IO

long io_getevents (aio_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout);

这里最重要的就是提供一个io_event数组给内核来copy完成的IO请求到这里,数组的大小是io_setup时指定的maxevents。

timeout是指等待IO完成的超时时间,设置为NULL表示一直等待所有到IO的完成。

4.销毁IO任务

int io_destroy (io_context_t ctx);

注:需要C/C++ Linux服务器开发学习资料私信“资料”(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQLredis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等),免费分享

linux异步IO编程实例分析

 

libaio和epoll的结合

在异步编程中,任何一个环节的阻塞都会导致整个程序的阻塞,所以一定要避免在io_getevents调用时阻塞式的等待。还记得io_iocb_common中的flags和resfd吗?看看libaio是如何提供io_getevents和事件循环的结合:

void io_set_eventfd(struct iocb *iocb, int eventfd)
{        iocb->u.c.flags |= (1 << 0) /* IOCB_FLAG_RESFD */;
        iocb->u.c.resfd = eventfd;}

这里的resfd是通过系统调用eventfd生成的。

int eventfd(unsigned int initval, int flags);

eventfd是linux 2.6.22内核之后加进来的syscall,作用是内核用来通知应用程序发生的事件的数量,从而使应用程序不用频繁地去轮询内核是否有时间发生,而是由内核将发生事件的数量写入到该fd,应用程序发现fd可读后,从fd读取该数值,并马上去内核读取。

有了eventfd,就可以很好地将libaio和epoll事件循环结合起来:

1. 创建一个eventfd

efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);

2. 将eventfd设置到iocb中

io_set_eventfd(iocb, efd);

3. 交接AIO请求

io_submit(ctx, NUM_EVENTS, iocb);

4. 创建一个epollfd,并将eventfd加到epoll中

epfd = epoll_create(1);
epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &epevent);
epoll_wait(epfd, &epevent, 1, -1);

5. 当eventfd可读时,从eventfd读出完成IO请求的数量,并调用io_getevents获取这些IO

read(efd, &finished_aio, sizeof(finished_aio);
r = io_getevents(ctx, 1, NUM_EVENTS, events, &tms);
linux异步IO编程实例分析

异步非阻塞IO模型的流程图

一个完整的编程实例

#define _GNU_SOURCE
#define __STDC_FORMAT_macROS#include <stdio.h>#include <errno.h>#include <libaio.h>#include <sys/eventfd.h>#include <sys/epoll.h>#include <stdlib.h>#include <sys/types.h>#include <unistd.h>#include <stdint.h>#include <sys/stat.h>#include <fcntl.h>#include <inttypes.h>#define TEST_FILE   "aio_test_file"
#define TEST_FILE_SIZE  (127 * 1024)
#define NUM_EVENTS  128
#define ALIGN_SIZE  512
#define RD_WR_SIZE  1024
struct custom_iocb
{    struct iocb iocb;
    int nth_request;};void aio_callback(io_context_t ctx, struct iocb *iocb, long res, long res2)
{    struct custom_iocb *iocbp = (struct custom_iocb *)iocb;
    printf("nth_request: %d, request_type: %s, offset: %lld, length: %lu, res: %ld, res2: %ldn", 
            iocbp->nth_request, (iocb->aio_lio_opcode == IO_CMD_PREAD) ? "READ" : "WRITE",
            iocb->u.c.offset, iocb->u.c.nbytes, res, res2);}int main(int argc, char *argv[])
{    int efd, fd, epfd;    io_context_t ctx;    struct timespec tms;
    struct io_event events[NUM_EVENTS];
    struct custom_iocb iocbs[NUM_EVENTS];
    struct iocb *iocbps[NUM_EVENTS];
    struct custom_iocb *iocbp;
    int i, j, r;    void *buf;    struct epoll_event epevent;
    efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    if (efd == -1) {
        perror("eventfd");
        return 2;
    }    fd = open(TEST_FILE, O_RDWR | O_CREAT | O_DIRECT, 0644);
    if (fd == -1) {
        perror("open");
        return 3;
    }    ftruncate(fd, TEST_FILE_SIZE);        ctx = 0;
    if (io_setup(8192, &ctx)) {
        perror("io_setup");
        return 4;
    }    if (posix_memalign(&buf, ALIGN_SIZE, RD_WR_SIZE)) {
        perror("posix_memalign");
        return 5;
    }    printf("buf: %pn", buf);
    for (i = 0, iocbp = iocbs; i < NUM_EVENTS; ++i, ++iocbp) {
        iocbps[i] = &iocbp->iocb;        io_prep_pread(&iocbp->iocb, fd, buf, RD_WR_SIZE, i * RD_WR_SIZE);        io_set_eventfd(&iocbp->iocb, efd);        io_set_callback(&iocbp->iocb, aio_callback);        iocbp->nth_request = i + 1;
    }    if (io_submit(ctx, NUM_EVENTS, iocbps) != NUM_EVENTS) {
        perror("io_submit");
        return 6;
    }    epfd = epoll_create(1);
    if (epfd == -1) {
        perror("epoll_create");
        return 7;
    }    epevent.events = EPOLLIN | EPOLLET;    epevent.data.ptr = NULL;    if (epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &epevent)) {
        perror("epoll_ctl");
        return 8;
    }    i = 0;
    while (i < NUM_EVENTS) {
        uint64_t finished_aio;        if (epoll_wait(epfd, &epevent, 1, -1) != 1) {
            perror("epoll_wait");
            return 9;
        }        if (read(efd, &finished_aio, sizeof(finished_aio)) != sizeof(finished_aio)) {
            perror("read");
            return 10;
        }        printf("finished io number: %"PRIu64"n", finished_aio);
            while (finished_aio > 0) {
            tms.tv_sec = 0;
            tms.tv_nsec = 0;
            r = io_getevents(ctx, 1, NUM_EVENTS, events, &tms);
            if (r > 0) {
                for (j = 0; j < r; ++j) {
                    ((io_callback_t)(events[j].data))(ctx, events[j].obj, events[j].res, events[j].res2);                }                i += r;                finished_aio -= r;            }        }    }        close(epfd);    free(buf);    io_destroy(ctx);    close(fd);    close(efd);    remove(TEST_FILE);    return 0;
}


说明:

1. 在centos 6.2 (libaio-devel 0.3.107-10) 上运行通过

2. struct io_event中的res字段表示读到的字节数或者一个负数错误码。在后一种情况下,-res表示对应的

errno。res2字段为0表示成功,否则失败

3. iocb在aio请求执行过程中必须是valid的

4. 在上面的程序中,通过扩展iocb结构来保存额外的信息(nth_request),并使用iocb.data

来保存回调函数的地址。如果回调函数是固定的,那么也可以使用iocb.data来保存额外信息。



Tags:异步IO编程   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
异步阻塞IO模型的流程图在Direct IO模式下,异步是非常有必要的(因为绕过了pagecache,直接和磁盘交互)。linux Native AIO正是基于这种场景设计的,具体的介绍见:KernelAsynchronous...【详细内容】
2020-09-23  Tags: 异步IO编程  点击:(96)  评论:(0)  加入收藏
▌简易百科推荐
1. 前言了解响应式编程,首先我们需要了解函数式操作和Stream的操作,下面我们简单的复习一下喽。1.1 常用函数式编程函数式接口中我们先来回顾一下Java中的函数式接口。常见的...【详细内容】
2022-07-15  二哥学Java    Tags:编程   点击:(1)  评论:(0)  加入收藏
在本文中,我们将学习如何使用 Next.js、 Prisma、 Postgres 和 Fastify 构建一个 Full-stack 应用程序。在本文中,我们将学习如何使用 Next.js、 Prisma、 Postgres 和 Fastif...【详细内容】
2022-07-12  qaseven    Tags:全栈   点击:(9)  评论:(0)  加入收藏
好的软件开发网站有哪些?做软件开发哪些网站能提供帮助呢?这些很多做软件开发的小伙伴都会问到的问题。007出海全球社交流量导航网站,整合了多方出海跨境网站资源,为你介绍出海...【详细内容】
2022-07-08  Chuhai007    Tags:软件开发   点击:(10)  评论:(0)  加入收藏
我们用monkey做压力测试后,会保存一个monkey日志,那如果想快速的分析日志中有哪些异常,我们可以用批处理工具进行快速的筛查,我们一起来看看吧。先编写个小脚本,然后修改为bat后...【详细内容】
2022-07-08  溪流涌动    Tags:monkey   点击:(13)  评论:(0)  加入收藏
白盒测试落地实践分为两个大方向,一个是静态分析,一个是动态分析,当然啦,也可以叫做静态测试和动态测试。那我们如何高质量保效率的做好白盒测试呢?Parasoft已经为您准备好了成熟...【详细内容】
2022-07-08  Parasoft中国    Tags:白盒测试   点击:(11)  评论:(0)  加入收藏
Altium Designer 自带脚本功能的开发项目,可以调用官方AD API接口对原理图或者PCB进行自动操作,本文主要分享开发的流程,和一些基本的概念信息,本文介绍的脚本工具例子可以用在P...【详细内容】
2022-07-07  电子工程师伟哥    Tags:Altium Designer   点击:(21)  评论:(0)  加入收藏
一、目录介绍 前置知识点 NIO Netty 的核心组件 Channel Callback Future 和 Promise 事件和 ChannelHandler Hello World二、前置知识点1、NIO首先我们需要回顾一...【详细内容】
2022-07-06  架构师jickly    Tags:聊天系统   点击:(16)  评论:(0)  加入收藏
1.事件流事件流是对事件执行过程的描述,了解事件的执行过程有助于加深对事件的理解,提升开发实践中对事件运用的灵活度。2.捕获和冒泡捕获阶段是【从父到子】的传导过程,冒泡阶...【详细内容】
2022-07-06  金乾坤    Tags:API   点击:(13)  评论:(0)  加入收藏
刷盘策略CommitLog的asyncPutMessage方法中可以看到在写入消息之后,调用了submitFlushRequest方法执行刷盘策略:public class CommitLog { public CompletableFuture<PutMe...【详细内容】
2022-07-06  Java码农之路    Tags:RocketMQ   点击:(16)  评论:(0)  加入收藏
最近读了本好书-《深度学习推荐系统》,读完不觉全身通畅,于是就有了写这篇文章的想法,把自己的理解和总结分享给大家。 本文将按照从算法到工程的顺序,先介绍一下推荐系统整体...【详细内容】
2022-07-05  InfoQ    Tags:推荐系统   点击:(22)  评论:(0)  加入收藏
相关文章
    无相关信息
站内最新
站内热门
站内头条