diff options
Diffstat (limited to 'fs/io_uring.c')
-rw-r--r-- | fs/io_uring.c | 593 |
1 files changed, 393 insertions, 200 deletions
diff --git a/fs/io_uring.c b/fs/io_uring.c index d542f1cf4428..0dadbdbead0f 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -75,7 +75,7 @@ #include "internal.h" -#define IORING_MAX_ENTRIES 4096 +#define IORING_MAX_ENTRIES 32768 #define IORING_MAX_FIXED_FILES 1024 struct io_uring { @@ -84,27 +84,29 @@ struct io_uring { }; /* - * This data is shared with the application through the mmap at offset - * IORING_OFF_SQ_RING. + * This data is shared with the application through the mmap at offsets + * IORING_OFF_SQ_RING and IORING_OFF_CQ_RING. * * The offsets to the member fields are published through struct * io_sqring_offsets when calling io_uring_setup. */ -struct io_sq_ring { +struct io_rings { /* * Head and tail offsets into the ring; the offsets need to be * masked to get valid indices. * - * The kernel controls head and the application controls tail. + * The kernel controls head of the sq ring and the tail of the cq ring, + * and the application controls tail of the sq ring and the head of the + * cq ring. */ - struct io_uring r; + struct io_uring sq, cq; /* - * Bitmask to apply to head and tail offsets (constant, equals + * Bitmasks to apply to head and tail offsets (constant, equals * ring_entries - 1) */ - u32 ring_mask; - /* Ring size (constant, power of 2) */ - u32 ring_entries; + u32 sq_ring_mask, cq_ring_mask; + /* Ring sizes (constant, power of 2) */ + u32 sq_ring_entries, cq_ring_entries; /* * Number of invalid entries dropped by the kernel due to * invalid index stored in array @@ -117,7 +119,7 @@ struct io_sq_ring { * counter includes all submissions that were dropped reaching * the new SQ head (and possibly more). */ - u32 dropped; + u32 sq_dropped; /* * Runtime flags * @@ -127,43 +129,7 @@ struct io_sq_ring { * The application needs a full memory barrier before checking * for IORING_SQ_NEED_WAKEUP after updating the sq tail. */ - u32 flags; - /* - * Ring buffer of indices into array of io_uring_sqe, which is - * mmapped by the application using the IORING_OFF_SQES offset. - * - * This indirection could e.g. be used to assign fixed - * io_uring_sqe entries to operations and only submit them to - * the queue when needed. - * - * The kernel modifies neither the indices array nor the entries - * array. - */ - u32 array[]; -}; - -/* - * This data is shared with the application through the mmap at offset - * IORING_OFF_CQ_RING. - * - * The offsets to the member fields are published through struct - * io_cqring_offsets when calling io_uring_setup. - */ -struct io_cq_ring { - /* - * Head and tail offsets into the ring; the offsets need to be - * masked to get valid indices. - * - * The application controls head and the kernel tail. - */ - struct io_uring r; - /* - * Bitmask to apply to head and tail offsets (constant, equals - * ring_entries - 1) - */ - u32 ring_mask; - /* Ring size (constant, power of 2) */ - u32 ring_entries; + u32 sq_flags; /* * Number of completion events lost because the queue was full; * this should be avoided by the application by making sure @@ -177,7 +143,7 @@ struct io_cq_ring { * As completion events come in out of order this counter is not * ordered with any other data. */ - u32 overflow; + u32 cq_overflow; /* * Ring buffer of completion events. * @@ -185,7 +151,7 @@ struct io_cq_ring { * produced, so the application is allowed to modify pending * entries. */ - struct io_uring_cqe cqes[]; + struct io_uring_cqe cqes[] ____cacheline_aligned_in_smp; }; struct io_mapped_ubuf { @@ -201,7 +167,7 @@ struct async_list { struct list_head list; struct file *file; - off_t io_end; + off_t io_start; size_t io_len; }; @@ -215,8 +181,18 @@ struct io_ring_ctx { bool compat; bool account_mem; - /* SQ ring */ - struct io_sq_ring *sq_ring; + /* + * Ring buffer of indices into array of io_uring_sqe, which is + * mmapped by the application using the IORING_OFF_SQES offset. + * + * This indirection could e.g. be used to assign fixed + * io_uring_sqe entries to operations and only submit them to + * the queue when needed. + * + * The kernel modifies neither the indices array nor the entries + * array. + */ + u32 *sq_array; unsigned cached_sq_head; unsigned sq_entries; unsigned sq_mask; @@ -227,15 +203,13 @@ struct io_ring_ctx { } ____cacheline_aligned_in_smp; /* IO offload */ - struct workqueue_struct *sqo_wq; + struct workqueue_struct *sqo_wq[2]; struct task_struct *sqo_thread; /* if using sq thread polling */ struct mm_struct *sqo_mm; wait_queue_head_t sqo_wait; struct completion sqo_thread_started; struct { - /* CQ ring */ - struct io_cq_ring *cq_ring; unsigned cached_cq_tail; unsigned cq_entries; unsigned cq_mask; @@ -244,6 +218,8 @@ struct io_ring_ctx { struct eventfd_ctx *cq_ev_fd; } ____cacheline_aligned_in_smp; + struct io_rings *rings; + /* * If used, fixed file set. Writers must ensure that ->refs is dead, * readers must ensure that ->refs is alive as long as the file* is @@ -288,6 +264,7 @@ struct io_ring_ctx { struct sqe_submit { const struct io_uring_sqe *sqe; unsigned short index; + u32 sequence; bool has_user; bool needs_lock; bool needs_fixed_file; @@ -335,6 +312,7 @@ struct io_kiocb { #define REQ_F_LINK 64 /* linked sqes */ #define REQ_F_LINK_DONE 128 /* linked sqes done */ #define REQ_F_FAIL_LINK 256 /* fail rest of links */ +#define REQ_F_SHADOW_DRAIN 512 /* link-drain shadow req */ u64 user_data; u32 result; u32 sequence; @@ -366,6 +344,7 @@ struct io_submit_state { }; static void io_sq_wq_submit_work(struct work_struct *work); +static void __io_free_req(struct io_kiocb *req); static struct kmem_cache *req_cachep; @@ -430,7 +409,7 @@ static inline bool io_sequence_defer(struct io_ring_ctx *ctx, if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN) return false; - return req->sequence != ctx->cached_cq_tail + ctx->sq_ring->dropped; + return req->sequence != ctx->cached_cq_tail + ctx->rings->sq_dropped; } static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx) @@ -451,11 +430,11 @@ static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx) static void __io_commit_cqring(struct io_ring_ctx *ctx) { - struct io_cq_ring *ring = ctx->cq_ring; + struct io_rings *rings = ctx->rings; - if (ctx->cached_cq_tail != READ_ONCE(ring->r.tail)) { + if (ctx->cached_cq_tail != READ_ONCE(rings->cq.tail)) { /* order cqe stores with ring update */ - smp_store_release(&ring->r.tail, ctx->cached_cq_tail); + smp_store_release(&rings->cq.tail, ctx->cached_cq_tail); if (wq_has_sleeper(&ctx->cq_wait)) { wake_up_interruptible(&ctx->cq_wait); @@ -464,6 +443,24 @@ static void __io_commit_cqring(struct io_ring_ctx *ctx) } } +static inline void io_queue_async_work(struct io_ring_ctx *ctx, + struct io_kiocb *req) +{ + int rw; + + switch (req->submit.sqe->opcode) { + case IORING_OP_WRITEV: + case IORING_OP_WRITE_FIXED: + rw = !(req->rw.ki_flags & IOCB_DIRECT); + break; + default: + rw = 0; + break; + } + + queue_work(ctx->sqo_wq[rw], &req->work); +} + static void io_commit_cqring(struct io_ring_ctx *ctx) { struct io_kiocb *req; @@ -471,14 +468,19 @@ static void io_commit_cqring(struct io_ring_ctx *ctx) __io_commit_cqring(ctx); while ((req = io_get_deferred_req(ctx)) != NULL) { + if (req->flags & REQ_F_SHADOW_DRAIN) { + /* Just for drain, free it. */ + __io_free_req(req); + continue; + } req->flags |= REQ_F_IO_DRAINED; - queue_work(ctx->sqo_wq, &req->work); + io_queue_async_work(ctx, req); } } static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx) { - struct io_cq_ring *ring = ctx->cq_ring; + struct io_rings *rings = ctx->rings; unsigned tail; tail = ctx->cached_cq_tail; @@ -487,11 +489,11 @@ static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx) * control dependency is enough as we're using WRITE_ONCE to * fill the cq entry */ - if (tail - READ_ONCE(ring->r.head) == ring->ring_entries) + if (tail - READ_ONCE(rings->cq.head) == rings->cq_ring_entries) return NULL; ctx->cached_cq_tail++; - return &ring->cqes[tail & ctx->cq_mask]; + return &rings->cqes[tail & ctx->cq_mask]; } static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data, @@ -510,9 +512,9 @@ static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data, WRITE_ONCE(cqe->res, res); WRITE_ONCE(cqe->flags, 0); } else { - unsigned overflow = READ_ONCE(ctx->cq_ring->overflow); + unsigned overflow = READ_ONCE(ctx->rings->cq_overflow); - WRITE_ONCE(ctx->cq_ring->overflow, overflow + 1); + WRITE_ONCE(ctx->rings->cq_overflow, overflow + 1); } } @@ -635,7 +637,7 @@ static void io_req_link_next(struct io_kiocb *req) nxt->flags |= REQ_F_LINK_DONE; INIT_WORK(&nxt->work, io_sq_wq_submit_work); - queue_work(req->ctx->sqo_wq, &nxt->work); + io_queue_async_work(req->ctx, nxt); } } @@ -679,6 +681,13 @@ static void io_put_req(struct io_kiocb *req) io_free_req(req); } +static unsigned io_cqring_events(struct io_rings *rings) +{ + /* See comment at the top of this file */ + smp_rmb(); + return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head); +} + /* * Find and free completed poll iocbs */ @@ -771,7 +780,7 @@ static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events, static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events, long min) { - while (!list_empty(&ctx->poll_list)) { + while (!list_empty(&ctx->poll_list) && !need_resched()) { int ret; ret = io_do_iopoll(ctx, nr_events, min); @@ -798,6 +807,12 @@ static void io_iopoll_reap_events(struct io_ring_ctx *ctx) unsigned int nr_events = 0; io_iopoll_getevents(ctx, &nr_events, 1); + + /* + * Ensure we allow local-to-the-cpu processing to take place, + * in this case we need to ensure that we reap all events. + */ + cond_resched(); } mutex_unlock(&ctx->uring_lock); } @@ -805,11 +820,42 @@ static void io_iopoll_reap_events(struct io_ring_ctx *ctx) static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events, long min) { - int ret = 0; + int iters, ret = 0; + /* + * We disallow the app entering submit/complete with polling, but we + * still need to lock the ring to prevent racing with polled issue + * that got punted to a workqueue. + */ + mutex_lock(&ctx->uring_lock); + + iters = 0; do { int tmin = 0; + /* + * Don't enter poll loop if we already have events pending. + * If we do, we can potentially be spinning for commands that + * already triggered a CQE (eg in error). + */ + if (io_cqring_events(ctx->rings)) + break; + + /* + * If a submit got punted to a workqueue, we can have the + * application entering polling for a command before it gets + * issued. That app will hold the uring_lock for the duration + * of the poll right here, so we need to take a breather every + * now and then to ensure that the issue has a chance to add + * the poll to the issued list. Otherwise we can spin here + * forever, while the workqueue is stuck trying to acquire the + * very same mutex. + */ + if (!(++iters & 7)) { + mutex_unlock(&ctx->uring_lock); + mutex_lock(&ctx->uring_lock); + } + if (*nr_events < min) tmin = min - *nr_events; @@ -819,6 +865,7 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events, ret = 0; } while (min && !*nr_events && !need_resched()); + mutex_unlock(&ctx->uring_lock); return ret; } @@ -1097,10 +1144,8 @@ static int io_import_fixed(struct io_ring_ctx *ctx, int rw, iter->bvec = bvec + seg_skip; iter->nr_segs -= seg_skip; - iter->count -= (seg_skip << PAGE_SHIFT); + iter->count -= bvec->bv_len + offset; iter->iov_offset = offset & ~PAGE_MASK; - if (iter->iov_offset) - iter->count -= iter->iov_offset; } } @@ -1144,6 +1189,28 @@ static ssize_t io_import_iovec(struct io_ring_ctx *ctx, int rw, return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter); } +static inline bool io_should_merge(struct async_list *al, struct kiocb *kiocb) +{ + if (al->file == kiocb->ki_filp) { + off_t start, end; + + /* + * Allow merging if we're anywhere in the range of the same + * page. Generally this happens for sub-page reads or writes, + * and it's beneficial to allow the first worker to bring the + * page in and the piggy backed work can then work on the + * cached page. + */ + start = al->io_start & PAGE_MASK; + end = (al->io_start + al->io_len + PAGE_SIZE - 1) & PAGE_MASK; + if (kiocb->ki_pos >= start && kiocb->ki_pos <= end) + return true; + } + + al->file = NULL; + return false; +} + /* * Make a note of the last file/offset/direction we punted to async * context. We'll use this information to see if we can piggy back a @@ -1155,9 +1222,8 @@ static void io_async_list_note(int rw, struct io_kiocb *req, size_t len) struct async_list *async_list = &req->ctx->pending_async[rw]; struct kiocb *kiocb = &req->rw; struct file *filp = kiocb->ki_filp; - off_t io_end = kiocb->ki_pos + len; - if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) { + if (io_should_merge(async_list, kiocb)) { unsigned long max_bytes; /* Use 8x RA size as a decent limiter for both reads/writes */ @@ -1170,17 +1236,16 @@ static void io_async_list_note(int rw, struct io_kiocb *req, size_t len) req->flags |= REQ_F_SEQ_PREV; async_list->io_len += len; } else { - io_end = 0; - async_list->io_len = 0; + async_list->file = NULL; } } /* New file? Reset state. */ if (async_list->file != filp) { - async_list->io_len = 0; + async_list->io_start = kiocb->ki_pos; + async_list->io_len = len; async_list->file = filp; } - async_list->io_end = io_end; } static int io_read(struct io_kiocb *req, const struct sqe_submit *s, @@ -1492,7 +1557,7 @@ static void io_poll_remove_one(struct io_kiocb *req) WRITE_ONCE(poll->canceled, true); if (!list_empty(&poll->wait.entry)) { list_del_init(&poll->wait.entry); - queue_work(req->ctx->sqo_wq, &req->work); + io_queue_async_work(req->ctx, req); } spin_unlock(&poll->head->lock); @@ -1606,7 +1671,7 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, io_cqring_ev_posted(ctx); io_put_req(req); } else { - queue_work(ctx->sqo_wq, &req->work); + io_queue_async_work(ctx, req); } return 1; @@ -1949,7 +2014,7 @@ out: */ static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req) { - bool ret = false; + bool ret; if (!list) return false; @@ -1995,10 +2060,14 @@ static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s, flags = READ_ONCE(s->sqe->flags); fd = READ_ONCE(s->sqe->fd); - if (flags & IOSQE_IO_DRAIN) { + if (flags & IOSQE_IO_DRAIN) req->flags |= REQ_F_IO_DRAIN; - req->sequence = ctx->cached_sq_head - 1; - } + /* + * All io need record the previous position, if LINK vs DARIN, + * it can be used to mark the position of the first IO in the + * link list. + */ + req->sequence = s->sequence; if (!io_op_needs_file(s->sqe)) return 0; @@ -2020,12 +2089,12 @@ static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s, return 0; } -static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, - struct sqe_submit *s) +static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, + struct sqe_submit *s, bool force_nonblock) { int ret; - ret = __io_submit_sqe(ctx, req, s, true); + ret = __io_submit_sqe(ctx, req, s, force_nonblock); if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) { struct io_uring_sqe *sqe_copy; @@ -2042,7 +2111,7 @@ static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, if (list) atomic_inc(&list->cnt); INIT_WORK(&req->work, io_sq_wq_submit_work); - queue_work(ctx->sqo_wq, &req->work); + io_queue_async_work(ctx, req); } /* @@ -2067,10 +2136,70 @@ static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, return ret; } +static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, + struct sqe_submit *s, bool force_nonblock) +{ + int ret; + + ret = io_req_defer(ctx, req, s->sqe); + if (ret) { + if (ret != -EIOCBQUEUED) { + io_free_req(req); + io_cqring_add_event(ctx, s->sqe->user_data, ret); + } + return 0; + } + + return __io_queue_sqe(ctx, req, s, force_nonblock); +} + +static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req, + struct sqe_submit *s, struct io_kiocb *shadow, + bool force_nonblock) +{ + int ret; + int need_submit = false; + + if (!shadow) + return io_queue_sqe(ctx, req, s, force_nonblock); + + /* + * Mark the first IO in link list as DRAIN, let all the following + * IOs enter the defer list. all IO needs to be completed before link + * list. + */ + req->flags |= REQ_F_IO_DRAIN; + ret = io_req_defer(ctx, req, s->sqe); + if (ret) { + if (ret != -EIOCBQUEUED) { + io_free_req(req); + io_cqring_add_event(ctx, s->sqe->user_data, ret); + return 0; + } + } else { + /* + * If ret == 0 means that all IOs in front of link io are + * running done. let's queue link head. + */ + need_submit = true; + } + + /* Insert shadow req to defer_list, blocking next IOs */ + spin_lock_irq(&ctx->completion_lock); + list_add_tail(&shadow->list, &ctx->defer_list); + spin_unlock_irq(&ctx->completion_lock); + + if (need_submit) + return __io_queue_sqe(ctx, req, s, force_nonblock); + + return 0; +} + #define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK) static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s, - struct io_submit_state *state, struct io_kiocb **link) + struct io_submit_state *state, struct io_kiocb **link, + bool force_nonblock) { struct io_uring_sqe *sqe_copy; struct io_kiocb *req; @@ -2097,13 +2226,6 @@ err: return; } - ret = io_req_defer(ctx, req, s->sqe); - if (ret) { - if (ret != -EIOCBQUEUED) - goto err_req; - return; - } - /* * If we already have a head request, queue this one for async * submittal once the head completes. If we don't have a head but @@ -2130,7 +2252,7 @@ err: INIT_LIST_HEAD(&req->link_list); *link = req; } else { - io_queue_sqe(ctx, req, s); + io_queue_sqe(ctx, req, s, force_nonblock); } } @@ -2160,15 +2282,15 @@ static void io_submit_state_start(struct io_submit_state *state, static void io_commit_sqring(struct io_ring_ctx *ctx) { - struct io_sq_ring *ring = ctx->sq_ring; + struct io_rings *rings = ctx->rings; - if (ctx->cached_sq_head != READ_ONCE(ring->r.head)) { + if (ctx->cached_sq_head != READ_ONCE(rings->sq.head)) { /* * Ensure any loads from the SQEs are done at this point, * since once we write the new head, the application could * write new data to them. */ - smp_store_release(&ring->r.head, ctx->cached_sq_head); + smp_store_release(&rings->sq.head, ctx->cached_sq_head); } } @@ -2182,7 +2304,8 @@ static void io_commit_sqring(struct io_ring_ctx *ctx) */ static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s) { - struct io_sq_ring *ring = ctx->sq_ring; + struct io_rings *rings = ctx->rings; + u32 *sq_array = ctx->sq_array; unsigned head; /* @@ -2195,20 +2318,21 @@ static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s) */ head = ctx->cached_sq_head; /* make sure SQ entry isn't read before tail */ - if (head == smp_load_acquire(&ring->r.tail)) + if (head == smp_load_acquire(&rings->sq.tail)) return false; - head = READ_ONCE(ring->array[head & ctx->sq_mask]); + head = READ_ONCE(sq_array[head & ctx->sq_mask]); if (head < ctx->sq_entries) { s->index = head; s->sqe = &ctx->sq_sqes[head]; + s->sequence = ctx->cached_sq_head; ctx->cached_sq_head++; return true; } /* drop invalid entries */ ctx->cached_sq_head++; - ring->dropped++; + rings->sq_dropped++; return false; } @@ -2217,6 +2341,7 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes, { struct io_submit_state state, *statep = NULL; struct io_kiocb *link = NULL; + struct io_kiocb *shadow_req = NULL; bool prev_was_link = false; int i, submitted = 0; @@ -2231,11 +2356,21 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes, * that's the end of the chain. Submit the previous link. */ if (!prev_was_link && link) { - io_queue_sqe(ctx, link, &link->submit); + io_queue_link_head(ctx, link, &link->submit, shadow_req, + true); link = NULL; } prev_was_link = (sqes[i].sqe->flags & IOSQE_IO_LINK) != 0; + if (link && (sqes[i].sqe->flags & IOSQE_IO_DRAIN)) { + if (!shadow_req) { + shadow_req = io_get_req(ctx, NULL); + shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN); + refcount_dec(&shadow_req->refs); + } + shadow_req->sequence = sqes[i].sequence; + } + if (unlikely(mm_fault)) { io_cqring_add_event(ctx, sqes[i].sqe->user_data, -EFAULT); @@ -2243,13 +2378,13 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes, sqes[i].has_user = has_user; sqes[i].needs_lock = true; sqes[i].needs_fixed_file = true; - io_submit_sqe(ctx, &sqes[i], statep, &link); + io_submit_sqe(ctx, &sqes[i], statep, &link, true); submitted++; } } if (link) - io_queue_sqe(ctx, link, &link->submit); + io_queue_link_head(ctx, link, &link->submit, shadow_req, true); if (statep) io_submit_state_end(&state); @@ -2280,15 +2415,7 @@ static int io_sq_thread(void *data) unsigned nr_events = 0; if (ctx->flags & IORING_SETUP_IOPOLL) { - /* - * We disallow the app entering submit/complete - * with polling, but we still need to lock the - * ring to prevent racing with polled issue - * that got punted to a workqueue. - */ - mutex_lock(&ctx->uring_lock); io_iopoll_check(ctx, &nr_events, 0); - mutex_unlock(&ctx->uring_lock); } else { /* * Normal IO, just pretend everything completed. @@ -2329,7 +2456,7 @@ static int io_sq_thread(void *data) TASK_INTERRUPTIBLE); /* Tell userspace we may need a wakeup call */ - ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP; + ctx->rings->sq_flags |= IORING_SQ_NEED_WAKEUP; /* make sure to read SQ tail after writing flags */ smp_mb(); @@ -2343,12 +2470,12 @@ static int io_sq_thread(void *data) schedule(); finish_wait(&ctx->sqo_wait, &wait); - ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP; + ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP; continue; } finish_wait(&ctx->sqo_wait, &wait); - ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP; + ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP; } i = 0; @@ -2389,10 +2516,12 @@ static int io_sq_thread(void *data) return 0; } -static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit) +static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit, + bool block_for_last) { struct io_submit_state state, *statep = NULL; struct io_kiocb *link = NULL; + struct io_kiocb *shadow_req = NULL; bool prev_was_link = false; int i, submit = 0; @@ -2402,6 +2531,7 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit) } for (i = 0; i < to_submit; i++) { + bool force_nonblock = true; struct sqe_submit s; if (!io_get_sqring(ctx, &s)) @@ -2412,34 +2542,49 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit) * that's the end of the chain. Submit the previous link. */ if (!prev_was_link && link) { - io_queue_sqe(ctx, link, &link->submit); + io_queue_link_head(ctx, link, &link->submit, shadow_req, + force_nonblock); link = NULL; } prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0; + if (link && (s.sqe->flags & IOSQE_IO_DRAIN)) { + if (!shadow_req) { + shadow_req = io_get_req(ctx, NULL); + shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN); + refcount_dec(&shadow_req->refs); + } + shadow_req->sequence = s.sequence; + } + s.has_user = true; s.needs_lock = false; s.needs_fixed_file = false; submit++; - io_submit_sqe(ctx, &s, statep, &link); + + /* + * The caller will block for events after submit, submit the + * last IO non-blocking. This is either the only IO it's + * submitting, or it already submitted the previous ones. This + * improves performance by avoiding an async punt that we don't + * need to do. + */ + if (block_for_last && submit == to_submit) + force_nonblock = false; + + io_submit_sqe(ctx, &s, statep, &link, force_nonblock); } io_commit_sqring(ctx); if (link) - io_queue_sqe(ctx, link, &link->submit); + io_queue_link_head(ctx, link, &link->submit, shadow_req, + block_for_last); if (statep) io_submit_state_end(statep); return submit; } -static unsigned io_cqring_events(struct io_cq_ring *ring) -{ - /* See comment at the top of this file */ - smp_rmb(); - return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head); -} - /* * Wait until events become available, if we don't already have some. The * application must reap them itself, as they reside on the shared cq ring. @@ -2447,10 +2592,10 @@ static unsigned io_cqring_events(struct io_cq_ring *ring) static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, const sigset_t __user *sig, size_t sigsz) { - struct io_cq_ring *ring = ctx->cq_ring; + struct io_rings *rings = ctx->rings; int ret; - if (io_cqring_events(ring) >= min_events) + if (io_cqring_events(rings) >= min_events) return 0; if (sig) { @@ -2466,12 +2611,12 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, return ret; } - ret = wait_event_interruptible(ctx->wait, io_cqring_events(ring) >= min_events); + ret = wait_event_interruptible(ctx->wait, io_cqring_events(rings) >= min_events); restore_saved_sigmask_unless(ret == -ERESTARTSYS); if (ret == -ERESTARTSYS) ret = -EINTR; - return READ_ONCE(ring->r.head) == READ_ONCE(ring->r.tail) ? ret : 0; + return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0; } static void __io_sqe_files_unregister(struct io_ring_ctx *ctx) @@ -2521,11 +2666,15 @@ static void io_sq_thread_stop(struct io_ring_ctx *ctx) static void io_finish_async(struct io_ring_ctx *ctx) { + int i; + io_sq_thread_stop(ctx); - if (ctx->sqo_wq) { - destroy_workqueue(ctx->sqo_wq); - ctx->sqo_wq = NULL; + for (i = 0; i < ARRAY_SIZE(ctx->sqo_wq); i++) { + if (ctx->sqo_wq[i]) { + destroy_workqueue(ctx->sqo_wq[i]); + ctx->sqo_wq[i] = NULL; + } } } @@ -2733,16 +2882,31 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx, } /* Do QD, or 2 * CPUS, whatever is smallest */ - ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE, + ctx->sqo_wq[0] = alloc_workqueue("io_ring-wq", + WQ_UNBOUND | WQ_FREEZABLE, min(ctx->sq_entries - 1, 2 * num_online_cpus())); - if (!ctx->sqo_wq) { + if (!ctx->sqo_wq[0]) { + ret = -ENOMEM; + goto err; + } + + /* + * This is for buffered writes, where we want to limit the parallelism + * due to file locking in file systems. As "normal" buffered writes + * should parellelize on writeout quite nicely, limit us to having 2 + * pending. This avoids massive contention on the inode when doing + * buffered async writes. + */ + ctx->sqo_wq[1] = alloc_workqueue("io_ring-write-wq", + WQ_UNBOUND | WQ_FREEZABLE, 2); + if (!ctx->sqo_wq[1]) { ret = -ENOMEM; goto err; } return 0; err: - io_sq_thread_stop(ctx); + io_finish_async(ctx); mmdrop(ctx->sqo_mm); ctx->sqo_mm = NULL; return ret; @@ -2791,17 +2955,45 @@ static void *io_mem_alloc(size_t size) return (void *) __get_free_pages(gfp_flags, get_order(size)); } +static unsigned long rings_size(unsigned sq_entries, unsigned cq_entries, + size_t *sq_offset) +{ + struct io_rings *rings; + size_t off, sq_array_size; + + off = struct_size(rings, cqes, cq_entries); + if (off == SIZE_MAX) + return SIZE_MAX; + +#ifdef CONFIG_SMP + off = ALIGN(off, SMP_CACHE_BYTES); + if (off == 0) + return SIZE_MAX; +#endif + + sq_array_size = array_size(sizeof(u32), sq_entries); + if (sq_array_size == SIZE_MAX) + return SIZE_MAX; + + if (check_add_overflow(off, sq_array_size, &off)) + return SIZE_MAX; + + if (sq_offset) + *sq_offset = off; + + return off; +} + static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries) { - struct io_sq_ring *sq_ring; - struct io_cq_ring *cq_ring; - size_t bytes; + size_t pages; - bytes = struct_size(sq_ring, array, sq_entries); - bytes += array_size(sizeof(struct io_uring_sqe), sq_entries); - bytes += struct_size(cq_ring, cqes, cq_entries); + pages = (size_t)1 << get_order( + rings_size(sq_entries, cq_entries, NULL)); + pages += (size_t)1 << get_order( + array_size(sizeof(struct io_uring_sqe), sq_entries)); - return (bytes + PAGE_SIZE - 1) / PAGE_SIZE; + return pages; } static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx) @@ -2815,7 +3007,7 @@ static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx) struct io_mapped_ubuf *imu = &ctx->user_bufs[i]; for (j = 0; j < imu->nr_bvecs; j++) - put_page(imu->bvec[j].bv_page); + put_user_page(imu->bvec[j].bv_page); if (ctx->account_mem) io_unaccount_mem(ctx->user, imu->nr_bvecs); @@ -2959,10 +3151,8 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg, * if we did partial map, or found file backed vmas, * release any pages we did get */ - if (pret > 0) { - for (j = 0; j < pret; j++) - put_page(pages[j]); - } + if (pret > 0) + put_user_pages(pages, pret); if (ctx->account_mem) io_unaccount_mem(ctx->user, nr_pages); kvfree(imu->bvec); @@ -3048,9 +3238,8 @@ static void io_ring_ctx_free(struct io_ring_ctx *ctx) } #endif - io_mem_free(ctx->sq_ring); + io_mem_free(ctx->rings); io_mem_free(ctx->sq_sqes); - io_mem_free(ctx->cq_ring); percpu_ref_exit(&ctx->refs); if (ctx->account_mem) @@ -3071,10 +3260,10 @@ static __poll_t io_uring_poll(struct file *file, poll_table *wait) * io_commit_cqring */ smp_rmb(); - if (READ_ONCE(ctx->sq_ring->r.tail) - ctx->cached_sq_head != - ctx->sq_ring->ring_entries) + if (READ_ONCE(ctx->rings->sq.tail) - ctx->cached_sq_head != + ctx->rings->sq_ring_entries) mask |= EPOLLOUT | EPOLLWRNORM; - if (READ_ONCE(ctx->cq_ring->r.head) != ctx->cached_cq_tail) + if (READ_ONCE(ctx->rings->sq.head) != ctx->cached_cq_tail) mask |= EPOLLIN | EPOLLRDNORM; return mask; @@ -3119,14 +3308,12 @@ static int io_uring_mmap(struct file *file, struct vm_area_struct *vma) switch (offset) { case IORING_OFF_SQ_RING: - ptr = ctx->sq_ring; + case IORING_OFF_CQ_RING: + ptr = ctx->rings; break; case IORING_OFF_SQES: ptr = ctx->sq_sqes; break; - case IORING_OFF_CQ_RING: - ptr = ctx->cq_ring; - break; default: return -EINVAL; } @@ -3169,19 +3356,27 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, * Just return the requested submit count, and wake the thread if * we were asked to. */ + ret = 0; if (ctx->flags & IORING_SETUP_SQPOLL) { if (flags & IORING_ENTER_SQ_WAKEUP) wake_up(&ctx->sqo_wait); submitted = to_submit; - goto out_ctx; - } + } else if (to_submit) { + bool block_for_last = false; - ret = 0; - if (to_submit) { to_submit = min(to_submit, ctx->sq_entries); + /* + * Allow last submission to block in a series, IFF the caller + * asked to wait for events and we don't currently have + * enough. This potentially avoids an async punt. + */ + if (to_submit == min_complete && + io_cqring_events(ctx->rings) < min_complete) + block_for_last = true; + mutex_lock(&ctx->uring_lock); - submitted = io_ring_submit(ctx, to_submit); + submitted = io_ring_submit(ctx, to_submit, block_for_last); mutex_unlock(&ctx->uring_lock); } if (flags & IORING_ENTER_GETEVENTS) { @@ -3190,15 +3385,12 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, min_complete = min(min_complete, ctx->cq_entries); if (ctx->flags & IORING_SETUP_IOPOLL) { - mutex_lock(&ctx->uring_lock); ret = io_iopoll_check(ctx, &nr_events, min_complete); - mutex_unlock(&ctx->uring_lock); } else { ret = io_cqring_wait(ctx, min_complete, sig, sigsz); } } -out_ctx: io_ring_drop_ctx_refs(ctx, 1); out_fput: fdput(f); @@ -3215,19 +3407,27 @@ static const struct file_operations io_uring_fops = { static int io_allocate_scq_urings(struct io_ring_ctx *ctx, struct io_uring_params *p) { - struct io_sq_ring *sq_ring; - struct io_cq_ring *cq_ring; - size_t size; + struct io_rings *rings; + size_t size, sq_array_offset; - sq_ring = io_mem_alloc(struct_size(sq_ring, array, p->sq_entries)); - if (!sq_ring) + size = rings_size(p->sq_entries, p->cq_entries, &sq_array_offset); + if (size == SIZE_MAX) + return -EOVERFLOW; + + rings = io_mem_alloc(size); + if (!rings) return -ENOMEM; - ctx->sq_ring = sq_ring; - sq_ring->ring_mask = p->sq_entries - 1; - sq_ring->ring_entries = p->sq_entries; - ctx->sq_mask = sq_ring->ring_mask; - ctx->sq_entries = sq_ring->ring_entries; + ctx->rings = rings; + ctx->sq_array = (u32 *)((char *)rings + sq_array_offset); + rings->sq_ring_mask = p->sq_entries - 1; + rings->cq_ring_mask = p->cq_entries - 1; + rings->sq_ring_entries = p->sq_entries; + rings->cq_ring_entries = p->cq_entries; + ctx->sq_mask = rings->sq_ring_mask; + ctx->cq_mask = rings->cq_ring_mask; + ctx->sq_entries = rings->sq_ring_entries; + ctx->cq_entries = rings->cq_ring_entries; size = array_size(sizeof(struct io_uring_sqe), p->sq_entries); if (size == SIZE_MAX) @@ -3237,15 +3437,6 @@ static int io_allocate_scq_urings(struct io_ring_ctx *ctx, if (!ctx->sq_sqes) return -ENOMEM; - cq_ring = io_mem_alloc(struct_size(cq_ring, cqes, p->cq_entries)); - if (!cq_ring) - return -ENOMEM; - - ctx->cq_ring = cq_ring; - cq_ring->ring_mask = p->cq_entries - 1; - cq_ring->ring_entries = p->cq_entries; - ctx->cq_mask = cq_ring->ring_mask; - ctx->cq_entries = cq_ring->ring_entries; return 0; } @@ -3349,21 +3540,23 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p) goto err; memset(&p->sq_off, 0, sizeof(p->sq_off)); - p->sq_off.head = offsetof(struct io_sq_ring, r.head); - p->sq_off.tail = offsetof(struct io_sq_ring, r.tail); - p->sq_off.ring_mask = offsetof(struct io_sq_ring, ring_mask); - p->sq_off.ring_entries = offsetof(struct io_sq_ring, ring_entries); - p->sq_off.flags = offsetof(struct io_sq_ring, flags); - p->sq_off.dropped = offsetof(struct io_sq_ring, dropped); - p->sq_off.array = offsetof(struct io_sq_ring, array); + p->sq_off.head = offsetof(struct io_rings, sq.head); + p->sq_off.tail = offsetof(struct io_rings, sq.tail); + p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask); + p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries); + p->sq_off.flags = offsetof(struct io_rings, sq_flags); + p->sq_off.dropped = offsetof(struct io_rings, sq_dropped); + p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings; memset(&p->cq_off, 0, sizeof(p->cq_off)); - p->cq_off.head = offsetof(struct io_cq_ring, r.head); - p->cq_off.tail = offsetof(struct io_cq_ring, r.tail); - p->cq_off.ring_mask = offsetof(struct io_cq_ring, ring_mask); - p->cq_off.ring_entries = offsetof(struct io_cq_ring, ring_entries); - p->cq_off.overflow = offsetof(struct io_cq_ring, overflow); - p->cq_off.cqes = offsetof(struct io_cq_ring, cqes); + p->cq_off.head = offsetof(struct io_rings, cq.head); + p->cq_off.tail = offsetof(struct io_rings, cq.tail); + p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask); + p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries); + p->cq_off.overflow = offsetof(struct io_rings, cq_overflow); + p->cq_off.cqes = offsetof(struct io_rings, cqes); + + p->features = IORING_FEAT_SINGLE_MMAP; return ret; err: io_ring_ctx_wait_and_kill(ctx); |