Linux的io机制

时间:2025-01-11 18:04:50

Linux的io机制

Buffered-IO 和Direct-IO

Linux磁盘I/O分为Buffered IO和Direct IO,这两者有何区别呢?

对于Buffered IO:

当应用程序尝试读取某块数据的时候,如果这块数据已经存放在了页缓存(page cache)中,那么这块数据就可以立即返回给应用程序,而不需要经过实际的物理读盘操作。当然,如果数据在应用程序读取之前并未被存放在页缓存中,那么就需要先将数据从磁盘读到页缓存中去。对于写操作来说,应用程序也会将数据先写到页缓存中去,数据是否被立即写到磁盘上去取决于应用程序所采用的写操作机制:如果用户采用的是同步写机制( synchronous writes ),那么数据会立即被写回到磁盘上,应用程序会一直等到数据被写完为止;如果用户采用的是延迟写机制( deferred writes ),那么应用程序就完全不需要等到数据全部被写回到磁盘,数据只要被写到页缓存中去就可以了。在延迟写机制的情况下,操作系统会定期地将放在页缓存中的数据刷到磁盘上。与异步写机制( asynchronous writes )不同的是,延迟写机制在数据完全写到磁盘上的时候不会通知应用程序,而异步写机制在数据完全写到磁盘上的时候是会返回给应用程序的。所以延迟写机制本身是存在数据丢失的风险的,而异步写机制则不会有这方面的担心。

总结下,Buffered IO的特点是使用了内存缓存,如:

  • 读操作:硬盘->内核页缓存->用户缓冲区
  • 写操作:用户缓冲区->内核页缓存->硬盘

对Buffered IO,数据在传输过程中需要在应用程序地址空间和页缓存之间进行多次数据拷贝操作,这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。

对于某些特殊的应用程序(如数据库)来说,避开操作系统内核缓冲区而直接在应用程序地址空间和磁盘之间传输数据会比使用操作系统内核缓冲区获取更好的性能。

Direct-io的目的在于绕过文件系统(ext)的cache,直接对block设备上的文件进行读写。但不经内核缓冲区,直接写磁盘,必然会引起阻塞。所以通常DIRECT-io与AIO(异步IO)会一起出现。

阻塞模式的IO过程如下:

int fd = open(const char *pathname, int flags, mode_t mode);
ssize_t pread(int fd, void *buf, size_t count, off_t offset);
ssize_t pwrite(int fd, const void *buf, size_t count, off_t offset);
int close(int fd);

因为整个过程会等待read/write的返回,所以不需要任何额外的数据结构。但异步IO的思想是:应用程序不能阻塞在昂贵的系统调用上让CPU睡大觉,而是将IO操作抽象成一个个的任务单元提交给内核,内核完成IO任务后将结果放在应用程序可以取到的地方。这样在底层做I/O的这段时间内,CPU可以去干其他的计算任务。但异步的IO任务批量的提交和完成,必须有自身可描述的结构,最重要的两个就是iocb和io_event。

libaio中的structs:

struct iocb {        // 描述IO请求

        void     *data;  /* Return in the io completion event */
unsigned key; /*r use in identifying io requests */
short aio_lio_opcode;  // 操作的类型:IO_CMD_PWRITE | IO_CMD_PREAD
short aio_reqprio;
int aio_fildes;    // 操作的文件fd
union {
struct io_iocb_common c;
struct io_iocb_vector v;
struct io_iocb_poll poll;
struct io_iocb_sockaddr saddr;
} u;
}; struct io_iocb_common {  
void *buf;  
unsigned long nbytes;
long long offset;
unsigned flags;
unsigned resfd;
};   struct io_event {    // 描述返回结果     void *data;     struct iocb *obj;  // 提交的任务     unsigned long res;  // IO任务完成的状态    unsigned long res2;  // 同上
};

libaio提供的API

libaio提供的API有:io_setup, io_submit, io_getevents, io_destroy。

1. 建立IO任务

int io_setup (int maxevents, io_context_t *ctxp);

io_context_t对应内核中一个结构,为异步IO请求提供上下文环境。注意在setup前必须将io_context_t初始化为0。

当然,这里也需要open需要操作的文件,注意设置O_DIRECT标志。

2.提交IO任务

long io_submit (aio_context_t ctx_id, long nr, struct iocb **iocbpp);

提交任务之前必须先填充iocb结构体,libaio提供的包装函数说明了需要完成的工作:

void io_prep_pread(struct iocb *iocb, int fd, void *buf, size_t count, long long offset)
{
  memset(iocb, , sizeof(*iocb));
  iocb->aio_fildes = fd;
  iocb->aio_lio_opcode = IO_CMD_PREAD;
  iocb->aio_reqprio = ;
  iocb->u.c.buf = buf;
  iocb->u.c.nbytes = count;
  iocb->u.c.offset = offset;
} void io_prep_pwrite(struct iocb *iocb, int fd, void *buf, size_t count, long long offset)
{
  memset(iocb, , sizeof(*iocb));
  iocb->aio_fildes = fd;
  iocb->aio_lio_opcode = IO_CMD_PWRITE;
  iocb->aio_reqprio = ;
  iocb->u.c.buf = buf;
  iocb->u.c.nbytes = count;
  iocb->u.c.offset = offset;
}

这里注意读写的buf都必须是按扇区对齐的,可以用posix_memalign来分配。

3.获取完成的IO

long io_getevents (aio_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout);

这里最重要的就是提供一个io_event数组给内核来copy完成的IO请求到这里,数组的大小是io_setup时指定的maxevents。

timeout是指等待IO完成的超时时间,设置为NULL表示一直等待所有到IO的完成。

4.销毁IO任务

int io_destroy (io_context_t ctx);

libaio和epoll的结合

在异步编程中,任何一个环节的阻塞都会导致整个程序的阻塞,所以一定要避免在io_getevents调用时阻塞式的等待。还记得io_iocb_common中的flags和resfd吗?看看libaio是如何提供io_getevents和事件循环的结合:

void io_set_eventfd(struct iocb *iocb, int eventfd)

{
iocb->u.c.flags |= ( << ) /* IOCB_FLAG_RESFD */;
iocb->u.c.resfd = eventfd;
}

这里的resfd是通过系统调用eventfd生成的。

int eventfd(unsigned int initval, int flags);

eventfd是linux 2.6.22内核之后加进来的syscall,作用是内核用来通知应用程序发生的事件的数量,从而使应用程序不用频繁地去轮询内核是否有时间发生,而是有内核将发生事件的数量写入到该fd,应用程序发现fd可读后,从fd读取该数值,并马上去内核读取。

有了eventfd,就可以很好地将libaio和epoll事件循环结合起来:

1. 创建一个eventfd

efd = eventfd(, EFD_NONBLOCK | EFD_CLOEXEC);

2. 将eventfd设置到iocb中

io_set_eventfd(iocb, efd);

3. 交接AIO请求

io_submit(ctx, NUM_EVENTS, iocb);

4. 创建一个epollfd,并将eventfd加到epoll中

epfd = epoll_create();

epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &epevent);

epoll_wait(epfd, &epevent, , -);

5. 当eventfd可读时,从eventfd读出完成IO请求的数量,并调用io_getevents获取这些IO

read(efd, &finished_aio, sizeof(finished_aio);

r = io_getevents(ctx, , NUM_EVENTS, events, &tms);

一个epoll/aio/eventfd结合使用的简单例子:

#define _GNU_SOURCE
#define __STDC_FORMAT_MACROS #include <stdio.h>
#include <errno.h>
#include <libaio.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdint.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <inttypes.h> #define TEST_FILE "aio_test_file"
#define TEST_FILE_SIZE (127 * 1024)
#define NUM_EVENTS 128
#define ALIGN_SIZE 512
#define RD_WR_SIZE 1024 struct custom_iocb
{
struct iocb iocb;
int nth_request;
}; void aio_callback(io_context_t ctx, struct iocb *iocb, long res, long res2)
{
struct custom_iocb *iocbp = (struct custom_iocb *)iocb;
printf("nth_request: %d, request_type: %s, offset: %lld, length: %lu, res: %ld, res2: %ld\n",
iocbp->nth_request, (iocb->aio_lio_opcode == IO_CMD_PREAD) ? "READ" : "WRITE",
iocb->u.c.offset, iocb->u.c.nbytes, res, res2);
} int main(int argc, char *argv[])
{
int efd, fd, epfd;
io_context_t ctx;
struct timespec tms;
struct io_event events[NUM_EVENTS];
struct custom_iocb iocbs[NUM_EVENTS];
struct iocb *iocbps[NUM_EVENTS];
struct custom_iocb *iocbp;
int i, j, r;
void *buf;
struct epoll_event epevent; efd = eventfd(, EFD_NONBLOCK | EFD_CLOEXEC);
if (efd == -) {
perror("eventfd");
return ;
} fd = open(TEST_FILE, O_RDWR | O_CREAT | O_DIRECT, );
if (fd == -) {
perror("open");
return ;
}
ftruncate(fd, TEST_FILE_SIZE); ctx = ;
if (io_setup(, &ctx)) {
perror("io_setup");
return ;
} if (posix_memalign(&buf, ALIGN_SIZE, RD_WR_SIZE)) {
perror("posix_memalign");
return ;
}
printf("buf: %p\n", buf); for (i = , iocbp = iocbs; i < NUM_EVENTS; ++i, ++iocbp) {
iocbps[i] = &iocbp->iocb;
io_prep_pread(&iocbp->iocb, fd, buf, RD_WR_SIZE, i * RD_WR_SIZE);
io_set_eventfd(&iocbp->iocb, efd);
io_set_callback(&iocbp->iocb, aio_callback);
iocbp->nth_request = i + ;
} if (io_submit(ctx, NUM_EVENTS, iocbps) != NUM_EVENTS) {
perror("io_submit");
return ;
} epfd = epoll_create();
if (epfd == -) {
perror("epoll_create");
return ;
} epevent.events = EPOLLIN | EPOLLET;
epevent.data.ptr = NULL;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &epevent)) {
perror("epoll_ctl");
return ;
} i = ;
while (i < NUM_EVENTS) {
uint64_t finished_aio; if (epoll_wait(epfd, &epevent, , -) != ) {
perror("epoll_wait");
return ;
} if (read(efd, &finished_aio, sizeof(finished_aio)) != sizeof(finished_aio)) {
perror("read");
return ;
} printf("finished io number: %"PRIu64"\n", finished_aio); while (finished_aio > ) {
tms.tv_sec = ;
tms.tv_nsec = ;
r = io_getevents(ctx, , NUM_EVENTS, events, &tms);
if (r > ) {
for (j = ; j < r; ++j) {
((io_callback_t)(events[j].data))(ctx, events[j].obj, events[j].res, events[j].res2);
}
i += r;
finished_aio -= r;
}
}
} close(epfd);
free(buf);
io_destroy(ctx);
close(fd);
close(efd);
remove(TEST_FILE); return ;
}