diff options
| author | Linus Torvalds <torvalds@linux-foundation.org> | 2026-02-10 04:22:00 +0300 |
|---|---|---|
| committer | Linus Torvalds <torvalds@linux-foundation.org> | 2026-02-10 04:22:00 +0300 |
| commit | f5d4feed174ce9fb3c42886a3c36038fd5a43e25 (patch) | |
| tree | 2e1940643a141621ef28b2ecb757d0dbce6ef9d7 /io_uring/io_uring.c | |
| parent | 26c9342bb761e463774a64fb6210b4f95f5bc035 (diff) | |
| parent | 442ae406603a94f1a263654494f425302ceb0445 (diff) | |
| download | linux-f5d4feed174ce9fb3c42886a3c36038fd5a43e25.tar.xz | |
Merge tag 'for-7.0/io_uring-20260206' of git://git.kernel.org/pub/scm/linux/kernel/git/axboe/linux
Pull io_uring updates from Jens Axboe:
- Clean up the IORING_SETUP_R_DISABLED and submitter task checking,
mostly just in preparation for relaxing the locking for SINGLE_ISSUER
in the future.
- Improve IOPOLL by using a doubly linked list to manage completions.
Previously it was singly listed, which meant that to complete request
N in the chain 0..N-1 had to have completed first. With a doubly
linked list we can complete whatever request completes in that order,
rather than need to wait for a consecutive range to be available.
This reduces latencies.
- Improve the restriction setup and checking. Mostly in preparation for
adding further features on top of that. Coming in a separate pull
request.
- Split out task_work and wait handling into separate files. These are
mostly nicely abstracted already, but still remained in the
io_uring.c file which is on the larger side.
- Use GFP_KERNEL_ACCOUNT in a few more spots, where appropriate.
- Ensure even the idle io-wq worker exits if a task no longer has any
rings open.
- Add support for a non-circular submission queue.
By default, the SQ ring keeps moving around, even if only a few
entries are used for each submission. This can be wasteful in terms
of cachelines.
If IORING_SETUP_SQ_REWIND is set for the ring when created, each
submission will start at offset 0 instead of where we last left off
doing submissions.
- Various little cleanups
* tag 'for-7.0/io_uring-20260206' of git://git.kernel.org/pub/scm/linux/kernel/git/axboe/linux: (30 commits)
io_uring/kbuf: fix memory leak if io_buffer_add_list fails
io_uring: Add SPDX id lines to remaining source files
io_uring: allow io-wq workers to exit when unused
io_uring/io-wq: add exit-on-idle state
io_uring/net: don't continue send bundle if poll was required for retry
io_uring/rsrc: use GFP_KERNEL_ACCOUNT consistently
io_uring/futex: use GFP_KERNEL_ACCOUNT for futex data allocation
io_uring/io-wq: handle !sysctl_hung_task_timeout_secs
io_uring: fix bad indentation for setup flags if statement
io_uring/rsrc: take unsigned index in io_rsrc_node_lookup()
io_uring: introduce non-circular SQ
io_uring: split out CQ waiting code into wait.c
io_uring: split out task work code into tw.c
io_uring/io-wq: don't trigger hung task for syzbot craziness
io_uring: add IO_URING_EXIT_WAIT_MAX definition
io_uring/sync: validate passed in offset
io_uring/eventfd: remove unused ctx->evfd_last_cq_tail member
io_uring/timeout: annotate data race in io_flush_timeouts()
io_uring/uring_cmd: explicitly disallow cancelations for IOPOLL
io_uring: fix IOPOLL with passthrough I/O
...
Diffstat (limited to 'io_uring/io_uring.c')
| -rw-r--r-- | io_uring/io_uring.c | 782 |
1 files changed, 43 insertions, 739 deletions
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index b7a077c11c21..2ca561881ef7 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -40,37 +40,25 @@ * Copyright (c) 2018-2019 Christoph Hellwig */ #include <linux/kernel.h> -#include <linux/init.h> #include <linux/errno.h> #include <linux/syscalls.h> -#include <net/compat.h> #include <linux/refcount.h> -#include <linux/uio.h> #include <linux/bits.h> #include <linux/sched/signal.h> #include <linux/fs.h> -#include <linux/file.h> #include <linux/mm.h> -#include <linux/mman.h> #include <linux/percpu.h> #include <linux/slab.h> -#include <linux/bvec.h> -#include <linux/net.h> -#include <net/sock.h> #include <linux/anon_inodes.h> -#include <linux/sched/mm.h> #include <linux/uaccess.h> #include <linux/nospec.h> -#include <linux/fsnotify.h> -#include <linux/fadvise.h> #include <linux/task_work.h> #include <linux/io_uring.h> #include <linux/io_uring/cmd.h> #include <linux/audit.h> #include <linux/security.h> #include <linux/jump_label.h> -#include <asm/shmparam.h> #define CREATE_TRACE_POINTS #include <trace/events/io_uring.h> @@ -105,6 +93,7 @@ #include "rw.h" #include "alloc_cache.h" #include "eventfd.h" +#include "wait.h" #define SQE_COMMON_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_LINK | \ IOSQE_IO_HARDLINK | IOSQE_ASYNC) @@ -122,19 +111,10 @@ #define IO_COMPL_BATCH 32 #define IO_REQ_ALLOC_BATCH 8 -#define IO_LOCAL_TW_DEFAULT_MAX 20 /* requests with any of those set should undergo io_disarm_next() */ #define IO_DISARM_MASK (REQ_F_ARM_LTIMEOUT | REQ_F_LINK_TIMEOUT | REQ_F_FAIL) -/* - * No waiters. It's larger than any valid value of the tw counter - * so that tests against ->cq_wait_nr would fail and skip wake_up(). - */ -#define IO_CQ_WAKE_INIT (-1U) -/* Forced wake up if there is a waiter regardless of ->cq_wait_nr */ -#define IO_CQ_WAKE_FORCE (IO_CQ_WAKE_INIT >> 1) - static void io_queue_sqe(struct io_kiocb *req, unsigned int extra_flags); static void __io_req_caches_free(struct io_ring_ctx *ctx); @@ -187,16 +167,6 @@ static void io_poison_req(struct io_kiocb *req) req->link = IO_URING_PTR_POISON; } -static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx) -{ - return ctx->cached_cq_tail - READ_ONCE(ctx->rings->cq.head); -} - -static inline unsigned int __io_cqring_events_user(struct io_ring_ctx *ctx) -{ - return READ_ONCE(ctx->rings->cq.tail) - READ_ONCE(ctx->rings->cq.head); -} - static inline void req_fail_link_node(struct io_kiocb *req, int res) { req_set_fail(req); @@ -217,38 +187,6 @@ static __cold void io_ring_ctx_ref_free(struct percpu_ref *ref) complete(&ctx->ref_comp); } -/* - * Terminate the request if either of these conditions are true: - * - * 1) It's being executed by the original task, but that task is marked - * with PF_EXITING as it's exiting. - * 2) PF_KTHREAD is set, in which case the invoker of the task_work is - * our fallback task_work. - * 3) The ring has been closed and is going away. - */ -static inline bool io_should_terminate_tw(struct io_ring_ctx *ctx) -{ - return (current->flags & (PF_EXITING | PF_KTHREAD)) || percpu_ref_is_dying(&ctx->refs); -} - -static __cold void io_fallback_req_func(struct work_struct *work) -{ - struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx, - fallback_work.work); - struct llist_node *node = llist_del_all(&ctx->fallback_llist); - struct io_kiocb *req, *tmp; - struct io_tw_state ts = {}; - - percpu_ref_get(&ctx->refs); - mutex_lock(&ctx->uring_lock); - ts.cancel = io_should_terminate_tw(ctx); - llist_for_each_entry_safe(req, tmp, node, io_task_work.node) - req->io_task_work.func((struct io_tw_req){req}, ts); - io_submit_flush_completions(ctx); - mutex_unlock(&ctx->uring_lock); - percpu_ref_put(&ctx->refs); -} - static int io_alloc_hash_table(struct io_hash_table *table, unsigned bits) { unsigned int hash_buckets; @@ -334,7 +272,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) init_waitqueue_head(&ctx->poll_wq); spin_lock_init(&ctx->completion_lock); raw_spin_lock_init(&ctx->timeout_lock); - INIT_WQ_LIST(&ctx->iopoll_list); + INIT_LIST_HEAD(&ctx->iopoll_list); INIT_LIST_HEAD(&ctx->defer_list); INIT_LIST_HEAD(&ctx->timeout_list); INIT_LIST_HEAD(&ctx->ltimeout_list); @@ -643,7 +581,7 @@ static void io_cqring_overflow_kill(struct io_ring_ctx *ctx) __io_cqring_overflow_flush(ctx, true); } -static void io_cqring_do_overflow_flush(struct io_ring_ctx *ctx) +void io_cqring_do_overflow_flush(struct io_ring_ctx *ctx) { mutex_lock(&ctx->uring_lock); __io_cqring_overflow_flush(ctx, false); @@ -1083,336 +1021,6 @@ static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req) return nxt; } -static void ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw) -{ - if (!ctx) - return; - if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) - atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); - - io_submit_flush_completions(ctx); - mutex_unlock(&ctx->uring_lock); - percpu_ref_put(&ctx->refs); -} - -/* - * Run queued task_work, returning the number of entries processed in *count. - * If more entries than max_entries are available, stop processing once this - * is reached and return the rest of the list. - */ -struct llist_node *io_handle_tw_list(struct llist_node *node, - unsigned int *count, - unsigned int max_entries) -{ - struct io_ring_ctx *ctx = NULL; - struct io_tw_state ts = { }; - - do { - struct llist_node *next = node->next; - struct io_kiocb *req = container_of(node, struct io_kiocb, - io_task_work.node); - - if (req->ctx != ctx) { - ctx_flush_and_put(ctx, ts); - ctx = req->ctx; - mutex_lock(&ctx->uring_lock); - percpu_ref_get(&ctx->refs); - ts.cancel = io_should_terminate_tw(ctx); - } - INDIRECT_CALL_2(req->io_task_work.func, - io_poll_task_func, io_req_rw_complete, - (struct io_tw_req){req}, ts); - node = next; - (*count)++; - if (unlikely(need_resched())) { - ctx_flush_and_put(ctx, ts); - ctx = NULL; - cond_resched(); - } - } while (node && *count < max_entries); - - ctx_flush_and_put(ctx, ts); - return node; -} - -static __cold void __io_fallback_tw(struct llist_node *node, bool sync) -{ - struct io_ring_ctx *last_ctx = NULL; - struct io_kiocb *req; - - while (node) { - req = container_of(node, struct io_kiocb, io_task_work.node); - node = node->next; - if (last_ctx != req->ctx) { - if (last_ctx) { - if (sync) - flush_delayed_work(&last_ctx->fallback_work); - percpu_ref_put(&last_ctx->refs); - } - last_ctx = req->ctx; - percpu_ref_get(&last_ctx->refs); - } - if (llist_add(&req->io_task_work.node, &last_ctx->fallback_llist)) - schedule_delayed_work(&last_ctx->fallback_work, 1); - } - - if (last_ctx) { - if (sync) - flush_delayed_work(&last_ctx->fallback_work); - percpu_ref_put(&last_ctx->refs); - } -} - -static void io_fallback_tw(struct io_uring_task *tctx, bool sync) -{ - struct llist_node *node = llist_del_all(&tctx->task_list); - - __io_fallback_tw(node, sync); -} - -struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, - unsigned int max_entries, - unsigned int *count) -{ - struct llist_node *node; - - node = llist_del_all(&tctx->task_list); - if (node) { - node = llist_reverse_order(node); - node = io_handle_tw_list(node, count, max_entries); - } - - /* relaxed read is enough as only the task itself sets ->in_cancel */ - if (unlikely(atomic_read(&tctx->in_cancel))) - io_uring_drop_tctx_refs(current); - - trace_io_uring_task_work_run(tctx, *count); - return node; -} - -void tctx_task_work(struct callback_head *cb) -{ - struct io_uring_task *tctx; - struct llist_node *ret; - unsigned int count = 0; - - tctx = container_of(cb, struct io_uring_task, task_work); - ret = tctx_task_work_run(tctx, UINT_MAX, &count); - /* can't happen */ - WARN_ON_ONCE(ret); -} - -static void io_req_local_work_add(struct io_kiocb *req, unsigned flags) -{ - struct io_ring_ctx *ctx = req->ctx; - unsigned nr_wait, nr_tw, nr_tw_prev; - struct llist_node *head; - - /* See comment above IO_CQ_WAKE_INIT */ - BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES); - - /* - * We don't know how many requests there are in the link and whether - * they can even be queued lazily, fall back to non-lazy. - */ - if (req->flags & IO_REQ_LINK_FLAGS) - flags &= ~IOU_F_TWQ_LAZY_WAKE; - - guard(rcu)(); - - head = READ_ONCE(ctx->work_llist.first); - do { - nr_tw_prev = 0; - if (head) { - struct io_kiocb *first_req = container_of(head, - struct io_kiocb, - io_task_work.node); - /* - * Might be executed at any moment, rely on - * SLAB_TYPESAFE_BY_RCU to keep it alive. - */ - nr_tw_prev = READ_ONCE(first_req->nr_tw); - } - - /* - * Theoretically, it can overflow, but that's fine as one of - * previous adds should've tried to wake the task. - */ - nr_tw = nr_tw_prev + 1; - if (!(flags & IOU_F_TWQ_LAZY_WAKE)) - nr_tw = IO_CQ_WAKE_FORCE; - - req->nr_tw = nr_tw; - req->io_task_work.node.next = head; - } while (!try_cmpxchg(&ctx->work_llist.first, &head, - &req->io_task_work.node)); - - /* - * cmpxchg implies a full barrier, which pairs with the barrier - * in set_current_state() on the io_cqring_wait() side. It's used - * to ensure that either we see updated ->cq_wait_nr, or waiters - * going to sleep will observe the work added to the list, which - * is similar to the wait/wawke task state sync. - */ - - if (!head) { - if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) - atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); - if (ctx->has_evfd) - io_eventfd_signal(ctx, false); - } - - nr_wait = atomic_read(&ctx->cq_wait_nr); - /* not enough or no one is waiting */ - if (nr_tw < nr_wait) - return; - /* the previous add has already woken it up */ - if (nr_tw_prev >= nr_wait) - return; - wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE); -} - -static void io_req_normal_work_add(struct io_kiocb *req) -{ - struct io_uring_task *tctx = req->tctx; - struct io_ring_ctx *ctx = req->ctx; - - /* task_work already pending, we're done */ - if (!llist_add(&req->io_task_work.node, &tctx->task_list)) - return; - - if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) - atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); - - /* SQPOLL doesn't need the task_work added, it'll run it itself */ - if (ctx->flags & IORING_SETUP_SQPOLL) { - __set_notify_signal(tctx->task); - return; - } - - if (likely(!task_work_add(tctx->task, &tctx->task_work, ctx->notify_method))) - return; - - io_fallback_tw(tctx, false); -} - -void __io_req_task_work_add(struct io_kiocb *req, unsigned flags) -{ - if (req->ctx->flags & IORING_SETUP_DEFER_TASKRUN) - io_req_local_work_add(req, flags); - else - io_req_normal_work_add(req); -} - -void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags) -{ - if (WARN_ON_ONCE(!(req->ctx->flags & IORING_SETUP_DEFER_TASKRUN))) - return; - __io_req_task_work_add(req, flags); -} - -static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx) -{ - struct llist_node *node = llist_del_all(&ctx->work_llist); - - __io_fallback_tw(node, false); - node = llist_del_all(&ctx->retry_llist); - __io_fallback_tw(node, false); -} - -static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events, - int min_events) -{ - if (!io_local_work_pending(ctx)) - return false; - if (events < min_events) - return true; - if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) - atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); - return false; -} - -static int __io_run_local_work_loop(struct llist_node **node, - io_tw_token_t tw, - int events) -{ - int ret = 0; - - while (*node) { - struct llist_node *next = (*node)->next; - struct io_kiocb *req = container_of(*node, struct io_kiocb, - io_task_work.node); - INDIRECT_CALL_2(req->io_task_work.func, - io_poll_task_func, io_req_rw_complete, - (struct io_tw_req){req}, tw); - *node = next; - if (++ret >= events) - break; - } - - return ret; -} - -static int __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw, - int min_events, int max_events) -{ - struct llist_node *node; - unsigned int loops = 0; - int ret = 0; - - if (WARN_ON_ONCE(ctx->submitter_task != current)) - return -EEXIST; - if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) - atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); -again: - tw.cancel = io_should_terminate_tw(ctx); - min_events -= ret; - ret = __io_run_local_work_loop(&ctx->retry_llist.first, tw, max_events); - if (ctx->retry_llist.first) - goto retry_done; - - /* - * llists are in reverse order, flip it back the right way before - * running the pending items. - */ - node = llist_reverse_order(llist_del_all(&ctx->work_llist)); - ret += __io_run_local_work_loop(&node, tw, max_events - ret); - ctx->retry_llist.first = node; - loops++; - - if (io_run_local_work_continue(ctx, ret, min_events)) - goto again; -retry_done: - io_submit_flush_completions(ctx); - if (io_run_local_work_continue(ctx, ret, min_events)) - goto again; - - trace_io_uring_local_work_run(ctx, ret, loops); - return ret; -} - -static inline int io_run_local_work_locked(struct io_ring_ctx *ctx, - int min_events) -{ - struct io_tw_state ts = {}; - - if (!io_local_work_pending(ctx)) - return 0; - return __io_run_local_work(ctx, ts, min_events, - max(IO_LOCAL_TW_DEFAULT_MAX, min_events)); -} - -int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events) -{ - struct io_tw_state ts = {}; - int ret; - - mutex_lock(&ctx->uring_lock); - ret = __io_run_local_work(ctx, ts, min_events, max_events); - mutex_unlock(&ctx->uring_lock); - return ret; -} - static void io_req_task_cancel(struct io_tw_req tw_req, io_tw_token_t tw) { struct io_kiocb *req = tw_req.req; @@ -1545,13 +1153,6 @@ void __io_submit_flush_completions(struct io_ring_ctx *ctx) ctx->submit_state.cq_flush = false; } -static unsigned io_cqring_events(struct io_ring_ctx *ctx) -{ - /* See comment at the top of this file */ - smp_rmb(); - return __io_cqring_events(ctx); -} - /* * We can't just wait for polled events to come to us, we have to actively * find and complete them. @@ -1562,7 +1163,7 @@ __cold void io_iopoll_try_reap_events(struct io_ring_ctx *ctx) return; mutex_lock(&ctx->uring_lock); - while (!wq_list_empty(&ctx->iopoll_list)) { + while (!list_empty(&ctx->iopoll_list)) { /* let it sleep and repeat later if can't complete a request */ if (io_do_iopoll(ctx, true) == 0) break; @@ -1627,21 +1228,18 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned int min_events) * forever, while the workqueue is stuck trying to acquire the * very same mutex. */ - if (wq_list_empty(&ctx->iopoll_list) || - io_task_work_pending(ctx)) { + if (list_empty(&ctx->iopoll_list) || io_task_work_pending(ctx)) { u32 tail = ctx->cached_cq_tail; (void) io_run_local_work_locked(ctx, min_events); - if (task_work_pending(current) || - wq_list_empty(&ctx->iopoll_list)) { + if (task_work_pending(current) || list_empty(&ctx->iopoll_list)) { mutex_unlock(&ctx->uring_lock); io_run_task_work(); mutex_lock(&ctx->uring_lock); } /* some requests don't go through iopoll_list */ - if (tail != ctx->cached_cq_tail || - wq_list_empty(&ctx->iopoll_list)) + if (tail != ctx->cached_cq_tail || list_empty(&ctx->iopoll_list)) break; } ret = io_do_iopoll(ctx, !min_events); @@ -1684,25 +1282,17 @@ static void io_iopoll_req_issued(struct io_kiocb *req, unsigned int issue_flags) * how we do polling eventually, not spinning if we're on potentially * different devices. */ - if (wq_list_empty(&ctx->iopoll_list)) { + if (list_empty(&ctx->iopoll_list)) { ctx->poll_multi_queue = false; } else if (!ctx->poll_multi_queue) { struct io_kiocb *list_req; - list_req = container_of(ctx->iopoll_list.first, struct io_kiocb, - comp_list); + list_req = list_first_entry(&ctx->iopoll_list, struct io_kiocb, iopoll_node); if (list_req->file != req->file) ctx->poll_multi_queue = true; } - /* - * For fast devices, IO may have already completed. If it has, add - * it to the front so we find it first. - */ - if (READ_ONCE(req->iopoll_completed)) - wq_list_add_head(&req->comp_list, &ctx->iopoll_list); - else - wq_list_add_tail(&req->comp_list, &ctx->iopoll_list); + list_add_tail(&req->iopoll_node, &ctx->iopoll_list); if (unlikely(needs_lock)) { /* @@ -2080,6 +1670,8 @@ static inline bool io_check_restriction(struct io_ring_ctx *ctx, struct io_kiocb *req, unsigned int sqe_flags) { + if (!ctx->op_restricted) + return true; if (!test_bit(req->opcode, ctx->restrictions.sqe_op)) return false; @@ -2181,8 +1773,8 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req, io_init_drain(ctx); } } - if (unlikely(ctx->restricted || ctx->drain_active || ctx->drain_next)) { - if (ctx->restricted && !io_check_restriction(ctx, req, sqe_flags)) + if (unlikely(ctx->op_restricted || ctx->drain_active || ctx->drain_next)) { + if (!io_check_restriction(ctx, req, sqe_flags)) return io_init_fail_req(req, -EACCES); /* knock it to the slow queue path, will be drained there */ if (ctx->drain_active) @@ -2354,12 +1946,16 @@ static void io_commit_sqring(struct io_ring_ctx *ctx) { struct io_rings *rings = ctx->rings; - /* - * Ensure any loads from the SQEs are done at this point, - * since once we write the new head, the application could - * write new data to them. - */ - smp_store_release(&rings->sq.head, ctx->cached_sq_head); + if (ctx->flags & IORING_SETUP_SQ_REWIND) { + ctx->cached_sq_head = 0; + } else { + /* + * Ensure any loads from the SQEs are done at this point, + * since once we write the new head, the application could + * write new data to them. + */ + smp_store_release(&rings->sq.head, ctx->cached_sq_head); + } } /* @@ -2405,10 +2001,15 @@ static bool io_get_sqe(struct io_ring_ctx *ctx, const struct io_uring_sqe **sqe) int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr) __must_hold(&ctx->uring_lock) { - unsigned int entries = io_sqring_entries(ctx); + unsigned int entries; unsigned int left; int ret; + if (ctx->flags & IORING_SETUP_SQ_REWIND) + entries = ctx->sq_entries; + else + entries = io_sqring_entries(ctx); + entries = min(nr, entries); if (unlikely(!entries)) return 0; @@ -2453,308 +2054,6 @@ int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr) return ret; } -static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode, - int wake_flags, void *key) -{ - struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, wq); - - /* - * Cannot safely flush overflowed CQEs from here, ensure we wake up - * the task, and the next invocation will do it. - */ - if (io_should_wake(iowq) || io_has_work(iowq->ctx)) - return autoremove_wake_function(curr, mode, wake_flags, key); - return -1; -} - -int io_run_task_work_sig(struct io_ring_ctx *ctx) -{ - if (io_local_work_pending(ctx)) { - __set_current_state(TASK_RUNNING); - if (io_run_local_work(ctx, INT_MAX, IO_LOCAL_TW_DEFAULT_MAX) > 0) - return 0; - } - if (io_run_task_work() > 0) - return 0; - if (task_sigpending(current)) - return -EINTR; - return 0; -} - -static bool current_pending_io(void) -{ - struct io_uring_task *tctx = current->io_uring; - - if (!tctx) - return false; - return percpu_counter_read_positive(&tctx->inflight); -} - -static enum hrtimer_restart io_cqring_timer_wakeup(struct hrtimer *timer) -{ - struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t); - - WRITE_ONCE(iowq->hit_timeout, 1); - iowq->min_timeout = 0; - wake_up_process(iowq->wq.private); - return HRTIMER_NORESTART; -} - -/* - * Doing min_timeout portion. If we saw any timeouts, events, or have work, - * wake up. If not, and we have a normal timeout, switch to that and keep - * sleeping. - */ -static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer) -{ - struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t); - struct io_ring_ctx *ctx = iowq->ctx; - - /* no general timeout, or shorter (or equal), we are done */ - if (iowq->timeout == KTIME_MAX || - ktime_compare(iowq->min_timeout, iowq->timeout) >= 0) - goto out_wake; - /* work we may need to run, wake function will see if we need to wake */ - if (io_has_work(ctx)) - goto out_wake; - /* got events since we started waiting, min timeout is done */ - if (iowq->cq_min_tail != READ_ONCE(ctx->rings->cq.tail)) - goto out_wake; - /* if we have any events and min timeout expired, we're done */ - if (io_cqring_events(ctx)) - goto out_wake; - - /* - * If using deferred task_work running and application is waiting on - * more than one request, ensure we reset it now where we are switching - * to normal sleeps. Any request completion post min_wait should wake - * the task and return. - */ - if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) { - atomic_set(&ctx->cq_wait_nr, 1); - smp_mb(); - if (!llist_empty(&ctx->work_llist)) - goto out_wake; - } - - /* any generated CQE posted past this time should wake us up */ - iowq->cq_tail = iowq->cq_min_tail; - - hrtimer_update_function(&iowq->t, io_cqring_timer_wakeup); - hrtimer_set_expires(timer, iowq->timeout); - return HRTIMER_RESTART; -out_wake: - return io_cqring_timer_wakeup(timer); -} - -static int io_cqring_schedule_timeout(struct io_wait_queue *iowq, - clockid_t clock_id, ktime_t start_time) -{ - ktime_t timeout; - - if (iowq->min_timeout) { - timeout = ktime_add_ns(iowq->min_timeout, start_time); - hrtimer_setup_on_stack(&iowq->t, io_cqring_min_timer_wakeup, clock_id, - HRTIMER_MODE_ABS); - } else { - timeout = iowq->timeout; - hrtimer_setup_on_stack(&iowq->t, io_cqring_timer_wakeup, clock_id, - HRTIMER_MODE_ABS); - } - - hrtimer_set_expires_range_ns(&iowq->t, timeout, 0); - hrtimer_start_expires(&iowq->t, HRTIMER_MODE_ABS); - - if (!READ_ONCE(iowq->hit_timeout)) - schedule(); - - hrtimer_cancel(&iowq->t); - destroy_hrtimer_on_stack(&iowq->t); - __set_current_state(TASK_RUNNING); - - return READ_ONCE(iowq->hit_timeout) ? -ETIME : 0; -} - -struct ext_arg { - size_t argsz; - struct timespec64 ts; - const sigset_t __user *sig; - ktime_t min_time; - bool ts_set; - bool iowait; -}; - -static int __io_cqring_wait_schedule(struct io_ring_ctx *ctx, - struct io_wait_queue *iowq, - struct ext_arg *ext_arg, - ktime_t start_time) -{ - int ret = 0; - - /* - * Mark us as being in io_wait if we have pending requests, so cpufreq - * can take into account that the task is waiting for IO - turns out - * to be important for low QD IO. - */ - if (ext_arg->iowait && current_pending_io()) - current->in_iowait = 1; - if (iowq->timeout != KTIME_MAX || iowq->min_timeout) - ret = io_cqring_schedule_timeout(iowq, ctx->clockid, start_time); - else - schedule(); - current->in_iowait = 0; - return ret; -} - -/* If this returns > 0, the caller should retry */ -static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx, - struct io_wait_queue *iowq, - struct ext_arg *ext_arg, - ktime_t start_time) -{ - if (unlikely(READ_ONCE(ctx->check_cq))) - return 1; - if (unlikely(io_local_work_pending(ctx))) - return 1; - if (unlikely(task_work_pending(current))) - return 1; - if (unlikely(task_sigpending(current))) - return -EINTR; - if (unlikely(io_should_wake(iowq))) - return 0; - - return __io_cqring_wait_schedule(ctx, iowq, ext_arg, start_time); -} - -/* - * Wait until events become available, if we don't already have some. The - * application must reap them itself, as they reside on the shared cq ring. - */ -static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags, - struct ext_arg *ext_arg) -{ - struct io_wait_queue iowq; - struct io_rings *rings = ctx->rings; - ktime_t start_time; - int ret; - - min_events = min_t(int, min_events, ctx->cq_entries); - - if (!io_allowed_run_tw(ctx)) - return -EEXIST; - if (io_local_work_pending(ctx)) - io_run_local_work(ctx, min_events, - max(IO_LOCAL_TW_DEFAULT_MAX, min_events)); - io_run_task_work(); - - if (unlikely(test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq))) - io_cqring_do_overflow_flush(ctx); - if (__io_cqring_events_user(ctx) >= min_events) - return 0; - - init_waitqueue_func_entry(&iowq.wq, io_wake_function); - iowq.wq.private = current; - INIT_LIST_HEAD(&iowq.wq.entry); - iowq.ctx = ctx; - iowq.cq_tail = READ_ONCE(ctx->rings->cq.head) + min_events; - iowq.cq_min_tail = READ_ONCE(ctx->rings->cq.tail); - iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts); - iowq.hit_timeout = 0; - iowq.min_timeout = ext_arg->min_time; - iowq.timeout = KTIME_MAX; - start_time = io_get_time(ctx); - - if (ext_arg->ts_set) { - iowq.timeout = timespec64_to_ktime(ext_arg->ts); - if (!(flags & IORING_ENTER_ABS_TIMER)) - iowq.timeout = ktime_add(iowq.timeout, start_time); - } - - if (ext_arg->sig) { -#ifdef CONFIG_COMPAT - if (in_compat_syscall()) - ret = set_compat_user_sigmask((const compat_sigset_t __user *)ext_arg->sig, - ext_arg->argsz); - else -#endif - ret = set_user_sigmask(ext_arg->sig, ext_arg->argsz); - - if (ret) - return ret; - } - - io_napi_busy_loop(ctx, &iowq); - - trace_io_uring_cqring_wait(ctx, min_events); - do { - unsigned long check_cq; - int nr_wait; - - /* if min timeout has been hit, don't reset wait count */ - if (!iowq.hit_timeout) - nr_wait = (int) iowq.cq_tail - - READ_ONCE(ctx->rings->cq.tail); - else - nr_wait = 1; - - if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) { - atomic_set(&ctx->cq_wait_nr, nr_wait); - set_current_state(TASK_INTERRUPTIBLE); - } else { - prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq, - TASK_INTERRUPTIBLE); - } - - ret = io_cqring_wait_schedule(ctx, &iowq, ext_arg, start_time); - __set_current_state(TASK_RUNNING); - atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT); - - /* - * Run task_work after scheduling and before io_should_wake(). - * If we got woken because of task_work being processed, run it - * now rather than let the caller do another wait loop. - */ - if (io_local_work_pending(ctx)) - io_run_local_work(ctx, nr_wait, nr_wait); - io_run_task_work(); - - /* - * Non-local task_work will be run on exit to userspace, but - * if we're using DEFER_TASKRUN, then we could have waited - * with a timeout for a number of requests. If the timeout - * hits, we could have some requests ready to process. Ensure - * this break is _after_ we have run task_work, to avoid - * deferring running potentially pending requests until the - * next time we wait for events. - */ - if (ret < 0) - break; - - check_cq = READ_ONCE(ctx->check_cq); - if (unlikely(check_cq)) { - /* let the caller flush overflows, retry */ - if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT)) - io_cqring_do_overflow_flush(ctx); - if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)) { - ret = -EBADR; - break; - } - } - - if (io_should_wake(&iowq)) { - ret = 0; - break; - } - cond_resched(); - } while (1); - - if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN)) - finish_wait(&ctx->cq_wait, &iowq.wq); - restore_saved_sigmask_unless(ret == -EINTR); - - return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0; -} - static void io_rings_free(struct io_ring_ctx *ctx) { io_free_region(ctx->user, &ctx->sq_region); @@ -2984,7 +2283,7 @@ static __cold void io_tctx_exit_cb(struct callback_head *cb) static __cold void io_ring_exit_work(struct work_struct *work) { struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx, exit_work); - unsigned long timeout = jiffies + HZ * 60 * 5; + unsigned long timeout = jiffies + IO_URING_EXIT_WAIT_MAX; unsigned long interval = HZ / 20; struct io_tctx_exit exit; struct io_tctx_node *node; @@ -3256,7 +2555,11 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, ctx = file->private_data; ret = -EBADFD; - if (unlikely(ctx->flags & IORING_SETUP_R_DISABLED)) + /* + * Keep IORING_SETUP_R_DISABLED check before submitter_task load + * in io_uring_add_tctx_node() -> __io_uring_add_tctx_node_from_submit() + */ + if (unlikely(smp_load_acquire(&ctx->flags) & IORING_SETUP_R_DISABLED)) goto out; /* @@ -3439,6 +2742,12 @@ static int io_uring_sanitise_params(struct io_uring_params *p) if (flags & ~IORING_SETUP_FLAGS) return -EINVAL; + if (flags & IORING_SETUP_SQ_REWIND) { + if ((flags & IORING_SETUP_SQPOLL) || + !(flags & IORING_SETUP_NO_SQARRAY)) + return -EINVAL; + } + /* There is no way to mmap rings without a real fd */ if ((flags & IORING_SETUP_REGISTERED_FD_ONLY) && !(flags & IORING_SETUP_NO_MMAP)) @@ -3661,13 +2970,8 @@ static __cold int io_uring_create(struct io_ctx_config *config) } if (ctx->flags & IORING_SETUP_SINGLE_ISSUER - && !(ctx->flags & IORING_SETUP_R_DISABLED)) { - /* - * Unlike io_register_enable_rings(), don't need WRITE_ONCE() - * since ctx isn't yet accessible from other tasks - */ + && !(ctx->flags & IORING_SETUP_R_DISABLED)) ctx->submitter_task = get_task_struct(current); - } file = io_uring_get_file(ctx); if (IS_ERR(file)) { |
