diff options
Diffstat (limited to 'io_uring/io_uring.c')
-rw-r--r-- | io_uring/io_uring.c | 223 |
1 files changed, 132 insertions, 91 deletions
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 61cd7ffd0f6a..3436e0b83534 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -167,7 +167,8 @@ EXPORT_SYMBOL(io_uring_get_socket); static inline void io_submit_flush_completions(struct io_ring_ctx *ctx) { - if (!wq_list_empty(&ctx->submit_state.compl_reqs)) + if (!wq_list_empty(&ctx->submit_state.compl_reqs) || + ctx->submit_state.cqes_count) __io_submit_flush_completions(ctx); } @@ -495,7 +496,7 @@ static void io_eventfd_ops(struct rcu_head *rcu) int ops = atomic_xchg(&ev_fd->ops, 0); if (ops & BIT(IO_EVENTFD_OP_SIGNAL_BIT)) - eventfd_signal(ev_fd->cq_ev_fd, 1); + eventfd_signal_mask(ev_fd->cq_ev_fd, 1, EPOLL_URING_WAKE); /* IO_EVENTFD_OP_FREE_BIT may not be set here depending on callback * ordering in a race but if references are 0 we know we have to free @@ -531,7 +532,7 @@ static void io_eventfd_signal(struct io_ring_ctx *ctx) goto out; if (likely(eventfd_signal_allowed())) { - eventfd_signal(ev_fd->cq_ev_fd, 1); + eventfd_signal_mask(ev_fd->cq_ev_fd, 1, EPOLL_URING_WAKE); } else { atomic_inc(&ev_fd->refs); if (!atomic_fetch_or(BIT(IO_EVENTFD_OP_SIGNAL_BIT), &ev_fd->ops)) @@ -581,23 +582,21 @@ void __io_commit_cqring_flush(struct io_ring_ctx *ctx) io_eventfd_flush_signal(ctx); } -static inline void io_cqring_ev_posted(struct io_ring_ctx *ctx) -{ - io_commit_cqring_flush(ctx); - io_cqring_wake(ctx); -} - -static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx) +/* keep it inlined for io_submit_flush_completions() */ +static inline void io_cq_unlock_post_inline(struct io_ring_ctx *ctx) __releases(ctx->completion_lock) { io_commit_cqring(ctx); spin_unlock(&ctx->completion_lock); - io_cqring_ev_posted(ctx); + + io_commit_cqring_flush(ctx); + io_cqring_wake(ctx); } void io_cq_unlock_post(struct io_ring_ctx *ctx) + __releases(ctx->completion_lock) { - __io_cq_unlock_post(ctx); + io_cq_unlock_post_inline(ctx); } /* Returns true if there are no backlogged entries after the flush */ @@ -778,11 +777,13 @@ struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx, bool overflow) return &rings->cqes[off]; } -bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags, - bool allow_overflow) +static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags, + bool allow_overflow) { struct io_uring_cqe *cqe; + lockdep_assert_held(&ctx->completion_lock); + ctx->cq_extra++; /* @@ -811,9 +812,23 @@ bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags return false; } -bool io_post_aux_cqe(struct io_ring_ctx *ctx, - u64 user_data, s32 res, u32 cflags, - bool allow_overflow) +static void __io_flush_post_cqes(struct io_ring_ctx *ctx) + __must_hold(&ctx->uring_lock) +{ + struct io_submit_state *state = &ctx->submit_state; + unsigned int i; + + lockdep_assert_held(&ctx->uring_lock); + for (i = 0; i < state->cqes_count; i++) { + struct io_uring_cqe *cqe = &state->cqes[i]; + + io_fill_cqe_aux(ctx, cqe->user_data, cqe->res, cqe->flags, true); + } + state->cqes_count = 0; +} + +static bool __io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags, + bool allow_overflow) { bool filled; @@ -823,15 +838,58 @@ bool io_post_aux_cqe(struct io_ring_ctx *ctx, return filled; } -static void __io_req_complete_put(struct io_kiocb *req) +bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags) +{ + return __io_post_aux_cqe(ctx, user_data, res, cflags, true); +} + +bool io_aux_cqe(struct io_ring_ctx *ctx, bool defer, u64 user_data, s32 res, u32 cflags, + bool allow_overflow) { + struct io_uring_cqe *cqe; + unsigned int length; + + if (!defer) + return __io_post_aux_cqe(ctx, user_data, res, cflags, allow_overflow); + + length = ARRAY_SIZE(ctx->submit_state.cqes); + + lockdep_assert_held(&ctx->uring_lock); + + if (ctx->submit_state.cqes_count == length) { + io_cq_lock(ctx); + __io_flush_post_cqes(ctx); + /* no need to flush - flush is deferred */ + spin_unlock(&ctx->completion_lock); + } + + /* For defered completions this is not as strict as it is otherwise, + * however it's main job is to prevent unbounded posted completions, + * and in that it works just as well. + */ + if (!allow_overflow && test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) + return false; + + cqe = &ctx->submit_state.cqes[ctx->submit_state.cqes_count++]; + cqe->user_data = user_data; + cqe->res = res; + cqe->flags = cflags; + return true; +} + +static void __io_req_complete_post(struct io_kiocb *req) +{ + struct io_ring_ctx *ctx = req->ctx; + + io_cq_lock(ctx); + if (!(req->flags & REQ_F_CQE_SKIP)) + __io_fill_cqe_req(ctx, req); + /* * If we're the last reference to this request, add to our locked * free_list cache. */ if (req_ref_put_and_test(req)) { - struct io_ring_ctx *ctx = req->ctx; - if (req->flags & IO_REQ_LINK_FLAGS) { if (req->flags & IO_DISARM_MASK) io_disarm_next(req); @@ -852,38 +910,35 @@ static void __io_req_complete_put(struct io_kiocb *req) wq_list_add_head(&req->comp_list, &ctx->locked_free_list); ctx->locked_free_nr++; } -} - -void __io_req_complete_post(struct io_kiocb *req) -{ - if (!(req->flags & REQ_F_CQE_SKIP)) - __io_fill_cqe_req(req->ctx, req); - __io_req_complete_put(req); -} - -void io_req_complete_post(struct io_kiocb *req) -{ - struct io_ring_ctx *ctx = req->ctx; - - io_cq_lock(ctx); - __io_req_complete_post(req); io_cq_unlock_post(ctx); } -inline void __io_req_complete(struct io_kiocb *req, unsigned issue_flags) +void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags) { - io_req_complete_post(req); + if (!(issue_flags & IO_URING_F_UNLOCKED) || + !(req->ctx->flags & IORING_SETUP_IOPOLL)) { + __io_req_complete_post(req); + } else { + struct io_ring_ctx *ctx = req->ctx; + + mutex_lock(&ctx->uring_lock); + __io_req_complete_post(req); + mutex_unlock(&ctx->uring_lock); + } } -void io_req_complete_failed(struct io_kiocb *req, s32 res) +void io_req_defer_failed(struct io_kiocb *req, s32 res) + __must_hold(&ctx->uring_lock) { const struct io_op_def *def = &io_op_defs[req->opcode]; + lockdep_assert_held(&req->ctx->uring_lock); + req_set_fail(req); io_req_set_res(req, res, io_put_kbuf(req, IO_URING_F_UNLOCKED)); if (def->fail) def->fail(req); - io_req_complete_post(req); + io_req_complete_defer(req); } /* @@ -1105,6 +1160,20 @@ void tctx_task_work(struct callback_head *cb) trace_io_uring_task_work_run(tctx, count, loops); } +static __cold void io_fallback_tw(struct io_uring_task *tctx) +{ + struct llist_node *node = llist_del_all(&tctx->task_list); + struct io_kiocb *req; + + while (node) { + req = container_of(node, struct io_kiocb, io_task_work.node); + node = node->next; + if (llist_add(&req->io_task_work.node, + &req->ctx->fallback_llist)) + schedule_delayed_work(&req->ctx->fallback_work, 1); + } +} + static void io_req_local_work_add(struct io_kiocb *req) { struct io_ring_ctx *ctx = req->ctx; @@ -1127,11 +1196,10 @@ static void io_req_local_work_add(struct io_kiocb *req) __io_cqring_wake(ctx); } -static inline void __io_req_task_work_add(struct io_kiocb *req, bool allow_local) +void __io_req_task_work_add(struct io_kiocb *req, bool allow_local) { struct io_uring_task *tctx = req->task->io_uring; struct io_ring_ctx *ctx = req->ctx; - struct llist_node *node; if (allow_local && ctx->flags & IORING_SETUP_DEFER_TASKRUN) { io_req_local_work_add(req); @@ -1148,20 +1216,7 @@ static inline void __io_req_task_work_add(struct io_kiocb *req, bool allow_local if (likely(!task_work_add(req->task, &tctx->task_work, ctx->notify_method))) return; - node = llist_del_all(&tctx->task_list); - - while (node) { - req = container_of(node, struct io_kiocb, io_task_work.node); - node = node->next; - if (llist_add(&req->io_task_work.node, - &req->ctx->fallback_llist)) - schedule_delayed_work(&req->ctx->fallback_work, 1); - } -} - -void io_req_task_work_add(struct io_kiocb *req) -{ - __io_req_task_work_add(req, true); + io_fallback_tw(tctx); } static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx) @@ -1237,23 +1292,10 @@ int io_run_local_work(struct io_ring_ctx *ctx) return ret; } -static void io_req_tw_post(struct io_kiocb *req, bool *locked) -{ - io_req_complete_post(req); -} - -void io_req_tw_post_queue(struct io_kiocb *req, s32 res, u32 cflags) -{ - io_req_set_res(req, res, cflags); - req->io_task_work.func = io_req_tw_post; - io_req_task_work_add(req); -} - static void io_req_task_cancel(struct io_kiocb *req, bool *locked) { - /* not needed for normal modes, but SQPOLL depends on it */ io_tw_lock(req->ctx, locked); - io_req_complete_failed(req, req->cqe.res); + io_req_defer_failed(req, req->cqe.res); } void io_req_task_submit(struct io_kiocb *req, bool *locked) @@ -1263,7 +1305,7 @@ void io_req_task_submit(struct io_kiocb *req, bool *locked) if (likely(!(req->task->flags & PF_EXITING))) io_queue_sqe(req); else - io_req_complete_failed(req, -EFAULT); + io_req_defer_failed(req, -EFAULT); } void io_req_task_queue_fail(struct io_kiocb *req, int ret) @@ -1344,6 +1386,9 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx) struct io_submit_state *state = &ctx->submit_state; io_cq_lock(ctx); + /* must come first to preserve CQE ordering in failure cases */ + if (state->cqes_count) + __io_flush_post_cqes(ctx); wq_list_for_each(node, prev, &state->compl_reqs) { struct io_kiocb *req = container_of(node, struct io_kiocb, comp_list); @@ -1351,10 +1396,12 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx) if (!(req->flags & REQ_F_CQE_SKIP)) __io_fill_cqe_req(ctx, req); } - __io_cq_unlock_post(ctx); + io_cq_unlock_post_inline(ctx); - io_free_batch_list(ctx, state->compl_reqs.first); - INIT_WQ_LIST(&state->compl_reqs); + if (!wq_list_empty(&ctx->submit_state.compl_reqs)) { + io_free_batch_list(ctx, state->compl_reqs.first); + INIT_WQ_LIST(&state->compl_reqs); + } } /* @@ -1476,16 +1523,10 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min) void io_req_task_complete(struct io_kiocb *req, bool *locked) { - if (req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)) { - unsigned issue_flags = *locked ? 0 : IO_URING_F_UNLOCKED; - - req->cqe.flags |= io_put_kbuf(req, issue_flags); - } - if (*locked) io_req_complete_defer(req); else - io_req_complete_post(req); + io_req_complete_post(req, IO_URING_F_UNLOCKED); } /* @@ -1635,6 +1676,7 @@ static u32 io_get_sequence(struct io_kiocb *req) } static __cold void io_drain_req(struct io_kiocb *req) + __must_hold(&ctx->uring_lock) { struct io_ring_ctx *ctx = req->ctx; struct io_defer_entry *de; @@ -1655,7 +1697,7 @@ queue: ret = io_req_prep_async(req); if (ret) { fail: - io_req_complete_failed(req, ret); + io_req_defer_failed(req, ret); return; } io_prep_async_link(req); @@ -1752,7 +1794,7 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags) if (issue_flags & IO_URING_F_COMPLETE_DEFER) io_req_complete_defer(req); else - io_req_complete_post(req); + io_req_complete_post(req, issue_flags); } else if (ret != IOU_ISSUE_SKIP_COMPLETE) return ret; @@ -1768,7 +1810,8 @@ int io_poll_issue(struct io_kiocb *req, bool *locked) io_tw_lock(req->ctx, locked); if (unlikely(req->task->flags & PF_EXITING)) return -EFAULT; - return io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_MULTISHOT); + return io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_MULTISHOT| + IO_URING_F_COMPLETE_DEFER); } struct io_wq_work *io_wq_free_work(struct io_wq_work *work) @@ -1787,7 +1830,7 @@ void io_wq_submit_work(struct io_wq_work *work) bool needs_poll = false; int ret = 0, err = -ECANCELED; - /* one will be dropped by ->io_free_work() after returning to io-wq */ + /* one will be dropped by ->io_wq_free_work() after returning to io-wq */ if (!(req->flags & REQ_F_REFCOUNT)) __io_req_set_refcount(req, 2); else @@ -1885,7 +1928,7 @@ static void io_queue_async(struct io_kiocb *req, int ret) struct io_kiocb *linked_timeout; if (ret != -EAGAIN || (req->flags & REQ_F_NOWAIT)) { - io_req_complete_failed(req, ret); + io_req_defer_failed(req, ret); return; } @@ -1935,14 +1978,14 @@ static void io_queue_sqe_fallback(struct io_kiocb *req) */ req->flags &= ~REQ_F_HARDLINK; req->flags |= REQ_F_LINK; - io_req_complete_failed(req, req->cqe.res); + io_req_defer_failed(req, req->cqe.res); } else if (unlikely(req->ctx->drain_active)) { io_drain_req(req); } else { int ret = io_req_prep_async(req); if (unlikely(ret)) - io_req_complete_failed(req, ret); + io_req_defer_failed(req, ret); else io_queue_iowq(req, NULL); } @@ -2670,7 +2713,7 @@ static __poll_t io_uring_poll(struct file *file, poll_table *wait) * lock(&ep->mtx); * * Users may get EPOLLIN meanwhile seeing nothing in cqring, this - * pushs them to do the flush. + * pushes them to do the flush. */ if (io_cqring_events(ctx) || io_has_work(ctx)) @@ -2871,7 +2914,7 @@ static __cold bool io_cancel_defer_files(struct io_ring_ctx *ctx, while (!list_empty(&list)) { de = list_first_entry(&list, struct io_defer_entry, list); list_del_init(&de->list); - io_req_complete_failed(de->req, -ECANCELED); + io_req_task_queue_fail(de->req, -ECANCELED); kfree(de); } return true; @@ -4064,8 +4107,6 @@ SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode, ctx = f.file->private_data; - io_run_task_work_ctx(ctx); - mutex_lock(&ctx->uring_lock); ret = __io_uring_register(ctx, opcode, arg, nr_args); mutex_unlock(&ctx->uring_lock); |