summaryrefslogtreecommitdiff
path: root/io_uring/io_uring.c
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2026-02-10 04:22:00 +0300
committerLinus Torvalds <torvalds@linux-foundation.org>2026-02-10 04:22:00 +0300
commitf5d4feed174ce9fb3c42886a3c36038fd5a43e25 (patch)
tree2e1940643a141621ef28b2ecb757d0dbce6ef9d7 /io_uring/io_uring.c
parent26c9342bb761e463774a64fb6210b4f95f5bc035 (diff)
parent442ae406603a94f1a263654494f425302ceb0445 (diff)
downloadlinux-f5d4feed174ce9fb3c42886a3c36038fd5a43e25.tar.xz
Merge tag 'for-7.0/io_uring-20260206' of git://git.kernel.org/pub/scm/linux/kernel/git/axboe/linux
Pull io_uring updates from Jens Axboe: - Clean up the IORING_SETUP_R_DISABLED and submitter task checking, mostly just in preparation for relaxing the locking for SINGLE_ISSUER in the future. - Improve IOPOLL by using a doubly linked list to manage completions. Previously it was singly listed, which meant that to complete request N in the chain 0..N-1 had to have completed first. With a doubly linked list we can complete whatever request completes in that order, rather than need to wait for a consecutive range to be available. This reduces latencies. - Improve the restriction setup and checking. Mostly in preparation for adding further features on top of that. Coming in a separate pull request. - Split out task_work and wait handling into separate files. These are mostly nicely abstracted already, but still remained in the io_uring.c file which is on the larger side. - Use GFP_KERNEL_ACCOUNT in a few more spots, where appropriate. - Ensure even the idle io-wq worker exits if a task no longer has any rings open. - Add support for a non-circular submission queue. By default, the SQ ring keeps moving around, even if only a few entries are used for each submission. This can be wasteful in terms of cachelines. If IORING_SETUP_SQ_REWIND is set for the ring when created, each submission will start at offset 0 instead of where we last left off doing submissions. - Various little cleanups * tag 'for-7.0/io_uring-20260206' of git://git.kernel.org/pub/scm/linux/kernel/git/axboe/linux: (30 commits) io_uring/kbuf: fix memory leak if io_buffer_add_list fails io_uring: Add SPDX id lines to remaining source files io_uring: allow io-wq workers to exit when unused io_uring/io-wq: add exit-on-idle state io_uring/net: don't continue send bundle if poll was required for retry io_uring/rsrc: use GFP_KERNEL_ACCOUNT consistently io_uring/futex: use GFP_KERNEL_ACCOUNT for futex data allocation io_uring/io-wq: handle !sysctl_hung_task_timeout_secs io_uring: fix bad indentation for setup flags if statement io_uring/rsrc: take unsigned index in io_rsrc_node_lookup() io_uring: introduce non-circular SQ io_uring: split out CQ waiting code into wait.c io_uring: split out task work code into tw.c io_uring/io-wq: don't trigger hung task for syzbot craziness io_uring: add IO_URING_EXIT_WAIT_MAX definition io_uring/sync: validate passed in offset io_uring/eventfd: remove unused ctx->evfd_last_cq_tail member io_uring/timeout: annotate data race in io_flush_timeouts() io_uring/uring_cmd: explicitly disallow cancelations for IOPOLL io_uring: fix IOPOLL with passthrough I/O ...
Diffstat (limited to 'io_uring/io_uring.c')
-rw-r--r--io_uring/io_uring.c782
1 files changed, 43 insertions, 739 deletions
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index b7a077c11c21..2ca561881ef7 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -40,37 +40,25 @@
* Copyright (c) 2018-2019 Christoph Hellwig
*/
#include <linux/kernel.h>
-#include <linux/init.h>
#include <linux/errno.h>
#include <linux/syscalls.h>
-#include <net/compat.h>
#include <linux/refcount.h>
-#include <linux/uio.h>
#include <linux/bits.h>
#include <linux/sched/signal.h>
#include <linux/fs.h>
-#include <linux/file.h>
#include <linux/mm.h>
-#include <linux/mman.h>
#include <linux/percpu.h>
#include <linux/slab.h>
-#include <linux/bvec.h>
-#include <linux/net.h>
-#include <net/sock.h>
#include <linux/anon_inodes.h>
-#include <linux/sched/mm.h>
#include <linux/uaccess.h>
#include <linux/nospec.h>
-#include <linux/fsnotify.h>
-#include <linux/fadvise.h>
#include <linux/task_work.h>
#include <linux/io_uring.h>
#include <linux/io_uring/cmd.h>
#include <linux/audit.h>
#include <linux/security.h>
#include <linux/jump_label.h>
-#include <asm/shmparam.h>
#define CREATE_TRACE_POINTS
#include <trace/events/io_uring.h>
@@ -105,6 +93,7 @@
#include "rw.h"
#include "alloc_cache.h"
#include "eventfd.h"
+#include "wait.h"
#define SQE_COMMON_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_LINK | \
IOSQE_IO_HARDLINK | IOSQE_ASYNC)
@@ -122,19 +111,10 @@
#define IO_COMPL_BATCH 32
#define IO_REQ_ALLOC_BATCH 8
-#define IO_LOCAL_TW_DEFAULT_MAX 20
/* requests with any of those set should undergo io_disarm_next() */
#define IO_DISARM_MASK (REQ_F_ARM_LTIMEOUT | REQ_F_LINK_TIMEOUT | REQ_F_FAIL)
-/*
- * No waiters. It's larger than any valid value of the tw counter
- * so that tests against ->cq_wait_nr would fail and skip wake_up().
- */
-#define IO_CQ_WAKE_INIT (-1U)
-/* Forced wake up if there is a waiter regardless of ->cq_wait_nr */
-#define IO_CQ_WAKE_FORCE (IO_CQ_WAKE_INIT >> 1)
-
static void io_queue_sqe(struct io_kiocb *req, unsigned int extra_flags);
static void __io_req_caches_free(struct io_ring_ctx *ctx);
@@ -187,16 +167,6 @@ static void io_poison_req(struct io_kiocb *req)
req->link = IO_URING_PTR_POISON;
}
-static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx)
-{
- return ctx->cached_cq_tail - READ_ONCE(ctx->rings->cq.head);
-}
-
-static inline unsigned int __io_cqring_events_user(struct io_ring_ctx *ctx)
-{
- return READ_ONCE(ctx->rings->cq.tail) - READ_ONCE(ctx->rings->cq.head);
-}
-
static inline void req_fail_link_node(struct io_kiocb *req, int res)
{
req_set_fail(req);
@@ -217,38 +187,6 @@ static __cold void io_ring_ctx_ref_free(struct percpu_ref *ref)
complete(&ctx->ref_comp);
}
-/*
- * Terminate the request if either of these conditions are true:
- *
- * 1) It's being executed by the original task, but that task is marked
- * with PF_EXITING as it's exiting.
- * 2) PF_KTHREAD is set, in which case the invoker of the task_work is
- * our fallback task_work.
- * 3) The ring has been closed and is going away.
- */
-static inline bool io_should_terminate_tw(struct io_ring_ctx *ctx)
-{
- return (current->flags & (PF_EXITING | PF_KTHREAD)) || percpu_ref_is_dying(&ctx->refs);
-}
-
-static __cold void io_fallback_req_func(struct work_struct *work)
-{
- struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx,
- fallback_work.work);
- struct llist_node *node = llist_del_all(&ctx->fallback_llist);
- struct io_kiocb *req, *tmp;
- struct io_tw_state ts = {};
-
- percpu_ref_get(&ctx->refs);
- mutex_lock(&ctx->uring_lock);
- ts.cancel = io_should_terminate_tw(ctx);
- llist_for_each_entry_safe(req, tmp, node, io_task_work.node)
- req->io_task_work.func((struct io_tw_req){req}, ts);
- io_submit_flush_completions(ctx);
- mutex_unlock(&ctx->uring_lock);
- percpu_ref_put(&ctx->refs);
-}
-
static int io_alloc_hash_table(struct io_hash_table *table, unsigned bits)
{
unsigned int hash_buckets;
@@ -334,7 +272,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
init_waitqueue_head(&ctx->poll_wq);
spin_lock_init(&ctx->completion_lock);
raw_spin_lock_init(&ctx->timeout_lock);
- INIT_WQ_LIST(&ctx->iopoll_list);
+ INIT_LIST_HEAD(&ctx->iopoll_list);
INIT_LIST_HEAD(&ctx->defer_list);
INIT_LIST_HEAD(&ctx->timeout_list);
INIT_LIST_HEAD(&ctx->ltimeout_list);
@@ -643,7 +581,7 @@ static void io_cqring_overflow_kill(struct io_ring_ctx *ctx)
__io_cqring_overflow_flush(ctx, true);
}
-static void io_cqring_do_overflow_flush(struct io_ring_ctx *ctx)
+void io_cqring_do_overflow_flush(struct io_ring_ctx *ctx)
{
mutex_lock(&ctx->uring_lock);
__io_cqring_overflow_flush(ctx, false);
@@ -1083,336 +1021,6 @@ static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req)
return nxt;
}
-static void ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw)
-{
- if (!ctx)
- return;
- if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
- atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
-
- io_submit_flush_completions(ctx);
- mutex_unlock(&ctx->uring_lock);
- percpu_ref_put(&ctx->refs);
-}
-
-/*
- * Run queued task_work, returning the number of entries processed in *count.
- * If more entries than max_entries are available, stop processing once this
- * is reached and return the rest of the list.
- */
-struct llist_node *io_handle_tw_list(struct llist_node *node,
- unsigned int *count,
- unsigned int max_entries)
-{
- struct io_ring_ctx *ctx = NULL;
- struct io_tw_state ts = { };
-
- do {
- struct llist_node *next = node->next;
- struct io_kiocb *req = container_of(node, struct io_kiocb,
- io_task_work.node);
-
- if (req->ctx != ctx) {
- ctx_flush_and_put(ctx, ts);
- ctx = req->ctx;
- mutex_lock(&ctx->uring_lock);
- percpu_ref_get(&ctx->refs);
- ts.cancel = io_should_terminate_tw(ctx);
- }
- INDIRECT_CALL_2(req->io_task_work.func,
- io_poll_task_func, io_req_rw_complete,
- (struct io_tw_req){req}, ts);
- node = next;
- (*count)++;
- if (unlikely(need_resched())) {
- ctx_flush_and_put(ctx, ts);
- ctx = NULL;
- cond_resched();
- }
- } while (node && *count < max_entries);
-
- ctx_flush_and_put(ctx, ts);
- return node;
-}
-
-static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
-{
- struct io_ring_ctx *last_ctx = NULL;
- struct io_kiocb *req;
-
- while (node) {
- req = container_of(node, struct io_kiocb, io_task_work.node);
- node = node->next;
- if (last_ctx != req->ctx) {
- if (last_ctx) {
- if (sync)
- flush_delayed_work(&last_ctx->fallback_work);
- percpu_ref_put(&last_ctx->refs);
- }
- last_ctx = req->ctx;
- percpu_ref_get(&last_ctx->refs);
- }
- if (llist_add(&req->io_task_work.node, &last_ctx->fallback_llist))
- schedule_delayed_work(&last_ctx->fallback_work, 1);
- }
-
- if (last_ctx) {
- if (sync)
- flush_delayed_work(&last_ctx->fallback_work);
- percpu_ref_put(&last_ctx->refs);
- }
-}
-
-static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
-{
- struct llist_node *node = llist_del_all(&tctx->task_list);
-
- __io_fallback_tw(node, sync);
-}
-
-struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
- unsigned int max_entries,
- unsigned int *count)
-{
- struct llist_node *node;
-
- node = llist_del_all(&tctx->task_list);
- if (node) {
- node = llist_reverse_order(node);
- node = io_handle_tw_list(node, count, max_entries);
- }
-
- /* relaxed read is enough as only the task itself sets ->in_cancel */
- if (unlikely(atomic_read(&tctx->in_cancel)))
- io_uring_drop_tctx_refs(current);
-
- trace_io_uring_task_work_run(tctx, *count);
- return node;
-}
-
-void tctx_task_work(struct callback_head *cb)
-{
- struct io_uring_task *tctx;
- struct llist_node *ret;
- unsigned int count = 0;
-
- tctx = container_of(cb, struct io_uring_task, task_work);
- ret = tctx_task_work_run(tctx, UINT_MAX, &count);
- /* can't happen */
- WARN_ON_ONCE(ret);
-}
-
-static void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
-{
- struct io_ring_ctx *ctx = req->ctx;
- unsigned nr_wait, nr_tw, nr_tw_prev;
- struct llist_node *head;
-
- /* See comment above IO_CQ_WAKE_INIT */
- BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
-
- /*
- * We don't know how many requests there are in the link and whether
- * they can even be queued lazily, fall back to non-lazy.
- */
- if (req->flags & IO_REQ_LINK_FLAGS)
- flags &= ~IOU_F_TWQ_LAZY_WAKE;
-
- guard(rcu)();
-
- head = READ_ONCE(ctx->work_llist.first);
- do {
- nr_tw_prev = 0;
- if (head) {
- struct io_kiocb *first_req = container_of(head,
- struct io_kiocb,
- io_task_work.node);
- /*
- * Might be executed at any moment, rely on
- * SLAB_TYPESAFE_BY_RCU to keep it alive.
- */
- nr_tw_prev = READ_ONCE(first_req->nr_tw);
- }
-
- /*
- * Theoretically, it can overflow, but that's fine as one of
- * previous adds should've tried to wake the task.
- */
- nr_tw = nr_tw_prev + 1;
- if (!(flags & IOU_F_TWQ_LAZY_WAKE))
- nr_tw = IO_CQ_WAKE_FORCE;
-
- req->nr_tw = nr_tw;
- req->io_task_work.node.next = head;
- } while (!try_cmpxchg(&ctx->work_llist.first, &head,
- &req->io_task_work.node));
-
- /*
- * cmpxchg implies a full barrier, which pairs with the barrier
- * in set_current_state() on the io_cqring_wait() side. It's used
- * to ensure that either we see updated ->cq_wait_nr, or waiters
- * going to sleep will observe the work added to the list, which
- * is similar to the wait/wawke task state sync.
- */
-
- if (!head) {
- if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
- atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
- if (ctx->has_evfd)
- io_eventfd_signal(ctx, false);
- }
-
- nr_wait = atomic_read(&ctx->cq_wait_nr);
- /* not enough or no one is waiting */
- if (nr_tw < nr_wait)
- return;
- /* the previous add has already woken it up */
- if (nr_tw_prev >= nr_wait)
- return;
- wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
-}
-
-static void io_req_normal_work_add(struct io_kiocb *req)
-{
- struct io_uring_task *tctx = req->tctx;
- struct io_ring_ctx *ctx = req->ctx;
-
- /* task_work already pending, we're done */
- if (!llist_add(&req->io_task_work.node, &tctx->task_list))
- return;
-
- if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
- atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
-
- /* SQPOLL doesn't need the task_work added, it'll run it itself */
- if (ctx->flags & IORING_SETUP_SQPOLL) {
- __set_notify_signal(tctx->task);
- return;
- }
-
- if (likely(!task_work_add(tctx->task, &tctx->task_work, ctx->notify_method)))
- return;
-
- io_fallback_tw(tctx, false);
-}
-
-void __io_req_task_work_add(struct io_kiocb *req, unsigned flags)
-{
- if (req->ctx->flags & IORING_SETUP_DEFER_TASKRUN)
- io_req_local_work_add(req, flags);
- else
- io_req_normal_work_add(req);
-}
-
-void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags)
-{
- if (WARN_ON_ONCE(!(req->ctx->flags & IORING_SETUP_DEFER_TASKRUN)))
- return;
- __io_req_task_work_add(req, flags);
-}
-
-static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
-{
- struct llist_node *node = llist_del_all(&ctx->work_llist);
-
- __io_fallback_tw(node, false);
- node = llist_del_all(&ctx->retry_llist);
- __io_fallback_tw(node, false);
-}
-
-static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
- int min_events)
-{
- if (!io_local_work_pending(ctx))
- return false;
- if (events < min_events)
- return true;
- if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
- atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
- return false;
-}
-
-static int __io_run_local_work_loop(struct llist_node **node,
- io_tw_token_t tw,
- int events)
-{
- int ret = 0;
-
- while (*node) {
- struct llist_node *next = (*node)->next;
- struct io_kiocb *req = container_of(*node, struct io_kiocb,
- io_task_work.node);
- INDIRECT_CALL_2(req->io_task_work.func,
- io_poll_task_func, io_req_rw_complete,
- (struct io_tw_req){req}, tw);
- *node = next;
- if (++ret >= events)
- break;
- }
-
- return ret;
-}
-
-static int __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw,
- int min_events, int max_events)
-{
- struct llist_node *node;
- unsigned int loops = 0;
- int ret = 0;
-
- if (WARN_ON_ONCE(ctx->submitter_task != current))
- return -EEXIST;
- if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
- atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
-again:
- tw.cancel = io_should_terminate_tw(ctx);
- min_events -= ret;
- ret = __io_run_local_work_loop(&ctx->retry_llist.first, tw, max_events);
- if (ctx->retry_llist.first)
- goto retry_done;
-
- /*
- * llists are in reverse order, flip it back the right way before
- * running the pending items.
- */
- node = llist_reverse_order(llist_del_all(&ctx->work_llist));
- ret += __io_run_local_work_loop(&node, tw, max_events - ret);
- ctx->retry_llist.first = node;
- loops++;
-
- if (io_run_local_work_continue(ctx, ret, min_events))
- goto again;
-retry_done:
- io_submit_flush_completions(ctx);
- if (io_run_local_work_continue(ctx, ret, min_events))
- goto again;
-
- trace_io_uring_local_work_run(ctx, ret, loops);
- return ret;
-}
-
-static inline int io_run_local_work_locked(struct io_ring_ctx *ctx,
- int min_events)
-{
- struct io_tw_state ts = {};
-
- if (!io_local_work_pending(ctx))
- return 0;
- return __io_run_local_work(ctx, ts, min_events,
- max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
-}
-
-int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events)
-{
- struct io_tw_state ts = {};
- int ret;
-
- mutex_lock(&ctx->uring_lock);
- ret = __io_run_local_work(ctx, ts, min_events, max_events);
- mutex_unlock(&ctx->uring_lock);
- return ret;
-}
-
static void io_req_task_cancel(struct io_tw_req tw_req, io_tw_token_t tw)
{
struct io_kiocb *req = tw_req.req;
@@ -1545,13 +1153,6 @@ void __io_submit_flush_completions(struct io_ring_ctx *ctx)
ctx->submit_state.cq_flush = false;
}
-static unsigned io_cqring_events(struct io_ring_ctx *ctx)
-{
- /* See comment at the top of this file */
- smp_rmb();
- return __io_cqring_events(ctx);
-}
-
/*
* We can't just wait for polled events to come to us, we have to actively
* find and complete them.
@@ -1562,7 +1163,7 @@ __cold void io_iopoll_try_reap_events(struct io_ring_ctx *ctx)
return;
mutex_lock(&ctx->uring_lock);
- while (!wq_list_empty(&ctx->iopoll_list)) {
+ while (!list_empty(&ctx->iopoll_list)) {
/* let it sleep and repeat later if can't complete a request */
if (io_do_iopoll(ctx, true) == 0)
break;
@@ -1627,21 +1228,18 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned int min_events)
* forever, while the workqueue is stuck trying to acquire the
* very same mutex.
*/
- if (wq_list_empty(&ctx->iopoll_list) ||
- io_task_work_pending(ctx)) {
+ if (list_empty(&ctx->iopoll_list) || io_task_work_pending(ctx)) {
u32 tail = ctx->cached_cq_tail;
(void) io_run_local_work_locked(ctx, min_events);
- if (task_work_pending(current) ||
- wq_list_empty(&ctx->iopoll_list)) {
+ if (task_work_pending(current) || list_empty(&ctx->iopoll_list)) {
mutex_unlock(&ctx->uring_lock);
io_run_task_work();
mutex_lock(&ctx->uring_lock);
}
/* some requests don't go through iopoll_list */
- if (tail != ctx->cached_cq_tail ||
- wq_list_empty(&ctx->iopoll_list))
+ if (tail != ctx->cached_cq_tail || list_empty(&ctx->iopoll_list))
break;
}
ret = io_do_iopoll(ctx, !min_events);
@@ -1684,25 +1282,17 @@ static void io_iopoll_req_issued(struct io_kiocb *req, unsigned int issue_flags)
* how we do polling eventually, not spinning if we're on potentially
* different devices.
*/
- if (wq_list_empty(&ctx->iopoll_list)) {
+ if (list_empty(&ctx->iopoll_list)) {
ctx->poll_multi_queue = false;
} else if (!ctx->poll_multi_queue) {
struct io_kiocb *list_req;
- list_req = container_of(ctx->iopoll_list.first, struct io_kiocb,
- comp_list);
+ list_req = list_first_entry(&ctx->iopoll_list, struct io_kiocb, iopoll_node);
if (list_req->file != req->file)
ctx->poll_multi_queue = true;
}
- /*
- * For fast devices, IO may have already completed. If it has, add
- * it to the front so we find it first.
- */
- if (READ_ONCE(req->iopoll_completed))
- wq_list_add_head(&req->comp_list, &ctx->iopoll_list);
- else
- wq_list_add_tail(&req->comp_list, &ctx->iopoll_list);
+ list_add_tail(&req->iopoll_node, &ctx->iopoll_list);
if (unlikely(needs_lock)) {
/*
@@ -2080,6 +1670,8 @@ static inline bool io_check_restriction(struct io_ring_ctx *ctx,
struct io_kiocb *req,
unsigned int sqe_flags)
{
+ if (!ctx->op_restricted)
+ return true;
if (!test_bit(req->opcode, ctx->restrictions.sqe_op))
return false;
@@ -2181,8 +1773,8 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
io_init_drain(ctx);
}
}
- if (unlikely(ctx->restricted || ctx->drain_active || ctx->drain_next)) {
- if (ctx->restricted && !io_check_restriction(ctx, req, sqe_flags))
+ if (unlikely(ctx->op_restricted || ctx->drain_active || ctx->drain_next)) {
+ if (!io_check_restriction(ctx, req, sqe_flags))
return io_init_fail_req(req, -EACCES);
/* knock it to the slow queue path, will be drained there */
if (ctx->drain_active)
@@ -2354,12 +1946,16 @@ static void io_commit_sqring(struct io_ring_ctx *ctx)
{
struct io_rings *rings = ctx->rings;
- /*
- * Ensure any loads from the SQEs are done at this point,
- * since once we write the new head, the application could
- * write new data to them.
- */
- smp_store_release(&rings->sq.head, ctx->cached_sq_head);
+ if (ctx->flags & IORING_SETUP_SQ_REWIND) {
+ ctx->cached_sq_head = 0;
+ } else {
+ /*
+ * Ensure any loads from the SQEs are done at this point,
+ * since once we write the new head, the application could
+ * write new data to them.
+ */
+ smp_store_release(&rings->sq.head, ctx->cached_sq_head);
+ }
}
/*
@@ -2405,10 +2001,15 @@ static bool io_get_sqe(struct io_ring_ctx *ctx, const struct io_uring_sqe **sqe)
int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
__must_hold(&ctx->uring_lock)
{
- unsigned int entries = io_sqring_entries(ctx);
+ unsigned int entries;
unsigned int left;
int ret;
+ if (ctx->flags & IORING_SETUP_SQ_REWIND)
+ entries = ctx->sq_entries;
+ else
+ entries = io_sqring_entries(ctx);
+
entries = min(nr, entries);
if (unlikely(!entries))
return 0;
@@ -2453,308 +2054,6 @@ int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
return ret;
}
-static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
- int wake_flags, void *key)
-{
- struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, wq);
-
- /*
- * Cannot safely flush overflowed CQEs from here, ensure we wake up
- * the task, and the next invocation will do it.
- */
- if (io_should_wake(iowq) || io_has_work(iowq->ctx))
- return autoremove_wake_function(curr, mode, wake_flags, key);
- return -1;
-}
-
-int io_run_task_work_sig(struct io_ring_ctx *ctx)
-{
- if (io_local_work_pending(ctx)) {
- __set_current_state(TASK_RUNNING);
- if (io_run_local_work(ctx, INT_MAX, IO_LOCAL_TW_DEFAULT_MAX) > 0)
- return 0;
- }
- if (io_run_task_work() > 0)
- return 0;
- if (task_sigpending(current))
- return -EINTR;
- return 0;
-}
-
-static bool current_pending_io(void)
-{
- struct io_uring_task *tctx = current->io_uring;
-
- if (!tctx)
- return false;
- return percpu_counter_read_positive(&tctx->inflight);
-}
-
-static enum hrtimer_restart io_cqring_timer_wakeup(struct hrtimer *timer)
-{
- struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t);
-
- WRITE_ONCE(iowq->hit_timeout, 1);
- iowq->min_timeout = 0;
- wake_up_process(iowq->wq.private);
- return HRTIMER_NORESTART;
-}
-
-/*
- * Doing min_timeout portion. If we saw any timeouts, events, or have work,
- * wake up. If not, and we have a normal timeout, switch to that and keep
- * sleeping.
- */
-static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer)
-{
- struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t);
- struct io_ring_ctx *ctx = iowq->ctx;
-
- /* no general timeout, or shorter (or equal), we are done */
- if (iowq->timeout == KTIME_MAX ||
- ktime_compare(iowq->min_timeout, iowq->timeout) >= 0)
- goto out_wake;
- /* work we may need to run, wake function will see if we need to wake */
- if (io_has_work(ctx))
- goto out_wake;
- /* got events since we started waiting, min timeout is done */
- if (iowq->cq_min_tail != READ_ONCE(ctx->rings->cq.tail))
- goto out_wake;
- /* if we have any events and min timeout expired, we're done */
- if (io_cqring_events(ctx))
- goto out_wake;
-
- /*
- * If using deferred task_work running and application is waiting on
- * more than one request, ensure we reset it now where we are switching
- * to normal sleeps. Any request completion post min_wait should wake
- * the task and return.
- */
- if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
- atomic_set(&ctx->cq_wait_nr, 1);
- smp_mb();
- if (!llist_empty(&ctx->work_llist))
- goto out_wake;
- }
-
- /* any generated CQE posted past this time should wake us up */
- iowq->cq_tail = iowq->cq_min_tail;
-
- hrtimer_update_function(&iowq->t, io_cqring_timer_wakeup);
- hrtimer_set_expires(timer, iowq->timeout);
- return HRTIMER_RESTART;
-out_wake:
- return io_cqring_timer_wakeup(timer);
-}
-
-static int io_cqring_schedule_timeout(struct io_wait_queue *iowq,
- clockid_t clock_id, ktime_t start_time)
-{
- ktime_t timeout;
-
- if (iowq->min_timeout) {
- timeout = ktime_add_ns(iowq->min_timeout, start_time);
- hrtimer_setup_on_stack(&iowq->t, io_cqring_min_timer_wakeup, clock_id,
- HRTIMER_MODE_ABS);
- } else {
- timeout = iowq->timeout;
- hrtimer_setup_on_stack(&iowq->t, io_cqring_timer_wakeup, clock_id,
- HRTIMER_MODE_ABS);
- }
-
- hrtimer_set_expires_range_ns(&iowq->t, timeout, 0);
- hrtimer_start_expires(&iowq->t, HRTIMER_MODE_ABS);
-
- if (!READ_ONCE(iowq->hit_timeout))
- schedule();
-
- hrtimer_cancel(&iowq->t);
- destroy_hrtimer_on_stack(&iowq->t);
- __set_current_state(TASK_RUNNING);
-
- return READ_ONCE(iowq->hit_timeout) ? -ETIME : 0;
-}
-
-struct ext_arg {
- size_t argsz;
- struct timespec64 ts;
- const sigset_t __user *sig;
- ktime_t min_time;
- bool ts_set;
- bool iowait;
-};
-
-static int __io_cqring_wait_schedule(struct io_ring_ctx *ctx,
- struct io_wait_queue *iowq,
- struct ext_arg *ext_arg,
- ktime_t start_time)
-{
- int ret = 0;
-
- /*
- * Mark us as being in io_wait if we have pending requests, so cpufreq
- * can take into account that the task is waiting for IO - turns out
- * to be important for low QD IO.
- */
- if (ext_arg->iowait && current_pending_io())
- current->in_iowait = 1;
- if (iowq->timeout != KTIME_MAX || iowq->min_timeout)
- ret = io_cqring_schedule_timeout(iowq, ctx->clockid, start_time);
- else
- schedule();
- current->in_iowait = 0;
- return ret;
-}
-
-/* If this returns > 0, the caller should retry */
-static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
- struct io_wait_queue *iowq,
- struct ext_arg *ext_arg,
- ktime_t start_time)
-{
- if (unlikely(READ_ONCE(ctx->check_cq)))
- return 1;
- if (unlikely(io_local_work_pending(ctx)))
- return 1;
- if (unlikely(task_work_pending(current)))
- return 1;
- if (unlikely(task_sigpending(current)))
- return -EINTR;
- if (unlikely(io_should_wake(iowq)))
- return 0;
-
- return __io_cqring_wait_schedule(ctx, iowq, ext_arg, start_time);
-}
-
-/*
- * Wait until events become available, if we don't already have some. The
- * application must reap them itself, as they reside on the shared cq ring.
- */
-static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags,
- struct ext_arg *ext_arg)
-{
- struct io_wait_queue iowq;
- struct io_rings *rings = ctx->rings;
- ktime_t start_time;
- int ret;
-
- min_events = min_t(int, min_events, ctx->cq_entries);
-
- if (!io_allowed_run_tw(ctx))
- return -EEXIST;
- if (io_local_work_pending(ctx))
- io_run_local_work(ctx, min_events,
- max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
- io_run_task_work();
-
- if (unlikely(test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)))
- io_cqring_do_overflow_flush(ctx);
- if (__io_cqring_events_user(ctx) >= min_events)
- return 0;
-
- init_waitqueue_func_entry(&iowq.wq, io_wake_function);
- iowq.wq.private = current;
- INIT_LIST_HEAD(&iowq.wq.entry);
- iowq.ctx = ctx;
- iowq.cq_tail = READ_ONCE(ctx->rings->cq.head) + min_events;
- iowq.cq_min_tail = READ_ONCE(ctx->rings->cq.tail);
- iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
- iowq.hit_timeout = 0;
- iowq.min_timeout = ext_arg->min_time;
- iowq.timeout = KTIME_MAX;
- start_time = io_get_time(ctx);
-
- if (ext_arg->ts_set) {
- iowq.timeout = timespec64_to_ktime(ext_arg->ts);
- if (!(flags & IORING_ENTER_ABS_TIMER))
- iowq.timeout = ktime_add(iowq.timeout, start_time);
- }
-
- if (ext_arg->sig) {
-#ifdef CONFIG_COMPAT
- if (in_compat_syscall())
- ret = set_compat_user_sigmask((const compat_sigset_t __user *)ext_arg->sig,
- ext_arg->argsz);
- else
-#endif
- ret = set_user_sigmask(ext_arg->sig, ext_arg->argsz);
-
- if (ret)
- return ret;
- }
-
- io_napi_busy_loop(ctx, &iowq);
-
- trace_io_uring_cqring_wait(ctx, min_events);
- do {
- unsigned long check_cq;
- int nr_wait;
-
- /* if min timeout has been hit, don't reset wait count */
- if (!iowq.hit_timeout)
- nr_wait = (int) iowq.cq_tail -
- READ_ONCE(ctx->rings->cq.tail);
- else
- nr_wait = 1;
-
- if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
- atomic_set(&ctx->cq_wait_nr, nr_wait);
- set_current_state(TASK_INTERRUPTIBLE);
- } else {
- prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
- TASK_INTERRUPTIBLE);
- }
-
- ret = io_cqring_wait_schedule(ctx, &iowq, ext_arg, start_time);
- __set_current_state(TASK_RUNNING);
- atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT);
-
- /*
- * Run task_work after scheduling and before io_should_wake().
- * If we got woken because of task_work being processed, run it
- * now rather than let the caller do another wait loop.
- */
- if (io_local_work_pending(ctx))
- io_run_local_work(ctx, nr_wait, nr_wait);
- io_run_task_work();
-
- /*
- * Non-local task_work will be run on exit to userspace, but
- * if we're using DEFER_TASKRUN, then we could have waited
- * with a timeout for a number of requests. If the timeout
- * hits, we could have some requests ready to process. Ensure
- * this break is _after_ we have run task_work, to avoid
- * deferring running potentially pending requests until the
- * next time we wait for events.
- */
- if (ret < 0)
- break;
-
- check_cq = READ_ONCE(ctx->check_cq);
- if (unlikely(check_cq)) {
- /* let the caller flush overflows, retry */
- if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
- io_cqring_do_overflow_flush(ctx);
- if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)) {
- ret = -EBADR;
- break;
- }
- }
-
- if (io_should_wake(&iowq)) {
- ret = 0;
- break;
- }
- cond_resched();
- } while (1);
-
- if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
- finish_wait(&ctx->cq_wait, &iowq.wq);
- restore_saved_sigmask_unless(ret == -EINTR);
-
- return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
-}
-
static void io_rings_free(struct io_ring_ctx *ctx)
{
io_free_region(ctx->user, &ctx->sq_region);
@@ -2984,7 +2283,7 @@ static __cold void io_tctx_exit_cb(struct callback_head *cb)
static __cold void io_ring_exit_work(struct work_struct *work)
{
struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx, exit_work);
- unsigned long timeout = jiffies + HZ * 60 * 5;
+ unsigned long timeout = jiffies + IO_URING_EXIT_WAIT_MAX;
unsigned long interval = HZ / 20;
struct io_tctx_exit exit;
struct io_tctx_node *node;
@@ -3256,7 +2555,11 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
ctx = file->private_data;
ret = -EBADFD;
- if (unlikely(ctx->flags & IORING_SETUP_R_DISABLED))
+ /*
+ * Keep IORING_SETUP_R_DISABLED check before submitter_task load
+ * in io_uring_add_tctx_node() -> __io_uring_add_tctx_node_from_submit()
+ */
+ if (unlikely(smp_load_acquire(&ctx->flags) & IORING_SETUP_R_DISABLED))
goto out;
/*
@@ -3439,6 +2742,12 @@ static int io_uring_sanitise_params(struct io_uring_params *p)
if (flags & ~IORING_SETUP_FLAGS)
return -EINVAL;
+ if (flags & IORING_SETUP_SQ_REWIND) {
+ if ((flags & IORING_SETUP_SQPOLL) ||
+ !(flags & IORING_SETUP_NO_SQARRAY))
+ return -EINVAL;
+ }
+
/* There is no way to mmap rings without a real fd */
if ((flags & IORING_SETUP_REGISTERED_FD_ONLY) &&
!(flags & IORING_SETUP_NO_MMAP))
@@ -3661,13 +2970,8 @@ static __cold int io_uring_create(struct io_ctx_config *config)
}
if (ctx->flags & IORING_SETUP_SINGLE_ISSUER
- && !(ctx->flags & IORING_SETUP_R_DISABLED)) {
- /*
- * Unlike io_register_enable_rings(), don't need WRITE_ONCE()
- * since ctx isn't yet accessible from other tasks
- */
+ && !(ctx->flags & IORING_SETUP_R_DISABLED))
ctx->submitter_task = get_task_struct(current);
- }
file = io_uring_get_file(ctx);
if (IS_ERR(file)) {