Linux io_uring basics and details can be fetched from here https://kernel.dk/io_uring.pdf. Reader is encouraged to complete this document first.
Also various examples of io_uring are mentioned here:
https://github.com/shuveb/io_uring-by-example
I am using 5.3.18-22 kernel in this blog.
Lets see the program to do read of twenty(20) 512bytes blocks from a block device.
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/ioctl.h>
#include <sys/syscall.h>
#include <sys/mman.h>
#include <sys/uio.h>
#include <linux/fs.h>
#include <unistd.h>
#include <string.h>
/* If your compilation fails because the header file below is missing,
* your kernel is probably too old to support io_uring.
#include <linux/io_uring.h>
#define QUEUE_DEPTH 1
#define BLOCK_SZ 512
/* This is x86 specific */
#define read_barrier() __asm__ __volatile__("":::"memory")
#define write_barrier() __asm__ __volatile__("":::"memory")
struct app_io_sq_ring {
unsigned *head;
unsigned *tail;
unsigned *ring_mask;
unsigned *ring_entries;
unsigned *flags;
unsigned *array;
};
struct app_io_cq_ring {
unsigned *head;
unsigned *tail;
unsigned *ring_mask;
unsigned *ring_entries;
struct io_uring_cqe *cqes;
};
struct submitter {
int ring_fd;
struct app_io_sq_ring sq_ring;
struct io_uring_sqe *sqes;
struct app_io_cq_ring cq_ring;
};
int io_uring_setup(unsigned entries, struct io_uring_params *p)
{
return (int) syscall(__NR_io_uring_setup, entries, p);
}
int io_uring_enter(int ring_fd, unsigned int to_submit,
unsigned int min_complete, unsigned int flags)
{
return (int) syscall(__NR_io_uring_enter, ring_fd, to_submit, min_complete,
flags, NULL, 0);
}
struct io_info {
int num_io;
struct iovec iovecs[];
};
struct file_info {
off_t file_sz;
struct iovec iovecs[]; /* Referred by readv/writev */
};
int submit_to_sq(char *file_path, struct submitter *s) {
struct io_info *ii;
struct stat st;
int file_fd = open(file_path, O_RDONLY);
if (file_fd < 0 ) {
perror("open");
return 1;
}
struct app_io_sq_ring *sring = &s->sq_ring;
unsigned index = 0, tail = 0, next_tail = 0;
off_t file_sz = 0;
// fetch the block device size
if(fstat(file_fd, &st) < 0) {
perror("fstat");
return -1;
}
if (S_ISBLK(st.st_mode)) {
unsigned long long bytes;
if (ioctl(file_fd, BLKGETSIZE64, &bytes) != 0) {
perror("ioctl");
return -1;
}
file_sz = bytes;
} else if (S_ISREG(st.st_mode))
file_sz = st.st_size;
printf("sz = %llu\n", file_sz);
if (file_sz < 0)
return 1;
//Lets submit IO to read first 20 blocks
struct iovec *iovecs;//[20];
ii = malloc(sizeof(*ii) + sizeof(struct iovec) * 20);
memset (ii, 0, sizeof(*ii) + (sizeof(struct iovec) * 20));
iovecs = ii->iovecs;
ii->num_io = 20;
for (int i = 0;i < 20;i++)
{
iovecs[i].iov_len = 512;
void *buf;
if( posix_memalign(&buf, BLOCK_SZ, BLOCK_SZ)) {
perror("posix_memalign");
return 1;
}
iovecs[i].iov_base = buf;
}
/* Add our submission queue entry to the tail of the SQE ring buffer */
read_barrier();
index = tail & *s->sq_ring.ring_mask;
struct io_uring_sqe *sqe = &s->sqes[index];
sqe->fd = file_fd;
sqe->flags = 0;
sqe->opcode = IORING_OP_READV;
sqe->addr = (unsigned long) iovecs;
sqe->len = 20;
sqe->off = 0;
sqe->user_data = (unsigned long long) ii;
sring->array[index] = index;
tail = next_tail;
/* Update the tail so the kernel can see it. */
if(*sring->tail != tail) {
*sring->tail = tail;
write_barrier();
}
int ret = io_uring_enter(s->ring_fd, 1, 1, IORING_ENTER_GETEVENTS);
if(ret < 0) {
perror("io_uring_enter");
return 1;
}
return 0;
}
void read_from_cq(struct submitter *s) {
struct iovec *iovecs;
struct io_info *ii;
struct app_io_cq_ring *cring = &s->cq_ring;
struct io_uring_cqe *cqe;
unsigned head, reaped = 0;
head = *cring->head;
do {
read_barrier();
/*
* Remember, this is a ring buffer. If head == tail, it means that the
* buffer is empty.
* */
if (head == *cring->tail)
break;
/* Get the entry */
cqe = &cring->cqes[head & *s->cq_ring.ring_mask];
ii = (struct io_info*) cqe->user_data;
iovecs = ii->iovecs;
if (cqe->res < 0)
fprintf(stderr, "Error: %s\n", strerror(abs(cqe->res)));
for (int i = 0; i < 20; i++)
{
printf("iov_base = %p, iov_len = %d\n",
iovecs[i].iov_base, iovecs[i].iov_len);
printf("%d, %d\n", *((char *)iovecs[i].iov_base) , *((char *)iovecs[i].iov_base + 1));
}
head++;
} while (1);
cring->head = head;
write_barrier();
}
int app_setup_uring(struct submitter *s) {
struct app_io_sq_ring *sring = &s->sq_ring;
struct app_io_cq_ring *cring = &s->cq_ring;
struct io_uring_params p;
void *sq_ptr, *cq_ptr;
memset(&p, 0, sizeof(p));
s->ring_fd = io_uring_setup(QUEUE_DEPTH, &p);
if (s->ring_fd < 0) {
perror("io_uring_setup");
return 1;
}
// Fetch and decide on submission and completion ring sizes
int sring_sz = p.sq_off.array + p.sq_entries * sizeof(unsigned);
int cring_sz = p.cq_off.cqes + p.cq_entries * sizeof(struct io_uring_cqe);
printf("sring_sz = %d, cring_sz = %d\n", sring_sz, cring_sz);
// mmap the submission ring
sq_ptr = mmap(0, sring_sz, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE,
s->ring_fd, IORING_OFF_SQ_RING);
if (sq_ptr == MAP_FAILED) {
perror("mmap");
return 1;
}
// mmap the completion ring
/* Map in the completion queue ring buffer in older kernels separately */
cq_ptr = mmap(0, cring_sz, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE,
s->ring_fd, IORING_OFF_CQ_RING);
if (cq_ptr == MAP_FAILED) {
return 1;
}
/* Save useful fields in a global app_io_sq_ring struct for later easy reference */
sring->head = sq_ptr + p.sq_off.head;
sring->tail = sq_ptr + p.sq_off.tail;
sring->ring_mask = sq_ptr + p.sq_off.ring_mask;
sring->ring_entries = sq_ptr + p.sq_off.ring_entries;
sring->flags = sq_ptr + p.sq_off.flags;
sring->array = sq_ptr + p.sq_off.array;
/* Map in the submission queue entries array */
s->sqes = mmap(0, p.sq_entries * sizeof(struct io_uring_sqe),
PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
s->ring_fd, IORING_OFF_SQES);
if (s->sqes == MAP_FAILED) {
perror("mmap");
return 1;
}
/* Save useful fields in a global app_io_cq_ring struct for later easy reference */
cring->head = cq_ptr + p.cq_off.head;
cring->tail = cq_ptr + p.cq_off.tail;
cring->ring_mask = cq_ptr + p.cq_off.ring_mask;
cring->ring_entries = cq_ptr + p.cq_off.ring_entries;
cring->cqes = cq_ptr + p.cq_off.cqes;
return 0;
}
int main(int argc, char *argv[]) {
struct submitter *s;
s = malloc(sizeof(*s));
if (!s) {
perror("malloc");
return 1;
}
memset(s, 0, sizeof(*s));
if(app_setup_uring(s)) {
fprintf(stderr, "Unable to setup uring!\n");
return 1;
}
printf("setup completed\n");
// Open the block device and read the data
printf("submit cq completed\n");
read_from_cq(s);
printf("read cq completed\n");
}
No comments:
Post a Comment