diff options
Diffstat (limited to 'io_uring')
-rw-r--r-- | io_uring/alloc_cache.h | 40 | ||||
-rw-r--r-- | io_uring/filetable.c | 24 | ||||
-rw-r--r-- | io_uring/io-wq.c | 538 | ||||
-rw-r--r-- | io_uring/io_uring.c | 356 | ||||
-rw-r--r-- | io_uring/io_uring.h | 49 | ||||
-rw-r--r-- | io_uring/kbuf.c | 167 | ||||
-rw-r--r-- | io_uring/kbuf.h | 7 | ||||
-rw-r--r-- | io_uring/msg_ring.c | 4 | ||||
-rw-r--r-- | io_uring/net.c | 29 | ||||
-rw-r--r-- | io_uring/net.h | 5 | ||||
-rw-r--r-- | io_uring/notif.c | 8 | ||||
-rw-r--r-- | io_uring/notif.h | 3 | ||||
-rw-r--r-- | io_uring/poll.c | 33 | ||||
-rw-r--r-- | io_uring/rsrc.c | 364 | ||||
-rw-r--r-- | io_uring/rsrc.h | 82 | ||||
-rw-r--r-- | io_uring/rw.c | 45 | ||||
-rw-r--r-- | io_uring/slist.h | 5 | ||||
-rw-r--r-- | io_uring/sqpoll.c | 1 | ||||
-rw-r--r-- | io_uring/timeout.c | 71 | ||||
-rw-r--r-- | io_uring/uring_cmd.c | 28 |
20 files changed, 989 insertions, 870 deletions
diff --git a/io_uring/alloc_cache.h b/io_uring/alloc_cache.h index 729793ae9712..241245cb54a6 100644 --- a/io_uring/alloc_cache.h +++ b/io_uring/alloc_cache.h @@ -7,46 +7,60 @@ #define IO_ALLOC_CACHE_MAX 512 struct io_cache_entry { - struct hlist_node node; + struct io_wq_work_node node; }; static inline bool io_alloc_cache_put(struct io_alloc_cache *cache, struct io_cache_entry *entry) { - if (cache->nr_cached < IO_ALLOC_CACHE_MAX) { + if (cache->nr_cached < cache->max_cached) { cache->nr_cached++; - hlist_add_head(&entry->node, &cache->list); + wq_stack_add_head(&entry->node, &cache->list); + /* KASAN poisons object */ + kasan_slab_free_mempool(entry); return true; } return false; } +static inline bool io_alloc_cache_empty(struct io_alloc_cache *cache) +{ + return !cache->list.next; +} + static inline struct io_cache_entry *io_alloc_cache_get(struct io_alloc_cache *cache) { - if (!hlist_empty(&cache->list)) { - struct hlist_node *node = cache->list.first; + if (cache->list.next) { + struct io_cache_entry *entry; - hlist_del(node); - return container_of(node, struct io_cache_entry, node); + entry = container_of(cache->list.next, struct io_cache_entry, node); + kasan_unpoison_range(entry, cache->elem_size); + cache->list.next = cache->list.next->next; + cache->nr_cached--; + return entry; } return NULL; } -static inline void io_alloc_cache_init(struct io_alloc_cache *cache) +static inline void io_alloc_cache_init(struct io_alloc_cache *cache, + unsigned max_nr, size_t size) { - INIT_HLIST_HEAD(&cache->list); + cache->list.next = NULL; cache->nr_cached = 0; + cache->max_cached = max_nr; + cache->elem_size = size; } static inline void io_alloc_cache_free(struct io_alloc_cache *cache, void (*free)(struct io_cache_entry *)) { - while (!hlist_empty(&cache->list)) { - struct hlist_node *node = cache->list.first; + while (1) { + struct io_cache_entry *entry = io_alloc_cache_get(cache); - hlist_del(node); - free(container_of(node, struct io_cache_entry, node)); + if (!entry) + break; + free(entry); } cache->nr_cached = 0; } diff --git a/io_uring/filetable.c b/io_uring/filetable.c index 68dfc6936aa7..0f6fa791a47d 100644 --- a/io_uring/filetable.c +++ b/io_uring/filetable.c @@ -19,6 +19,9 @@ static int io_file_bitmap_get(struct io_ring_ctx *ctx) unsigned long nr = ctx->file_alloc_end; int ret; + if (!table->bitmap) + return -ENFILE; + do { ret = find_next_zero_bit(table->bitmap, nr, table->alloc_hint); if (ret != nr) @@ -61,7 +64,6 @@ static int io_install_fixed_file(struct io_ring_ctx *ctx, struct file *file, u32 slot_index) __must_hold(&req->ctx->uring_lock) { - bool needs_switch = false; struct io_fixed_file *file_slot; int ret; @@ -78,18 +80,13 @@ static int io_install_fixed_file(struct io_ring_ctx *ctx, struct file *file, if (file_slot->file_ptr) { struct file *old_file; - ret = io_rsrc_node_switch_start(ctx); - if (ret) - goto err; - old_file = (struct file *)(file_slot->file_ptr & FFS_MASK); - ret = io_queue_rsrc_removal(ctx->file_data, slot_index, - ctx->rsrc_node, old_file); + ret = io_queue_rsrc_removal(ctx->file_data, slot_index, old_file); if (ret) - goto err; + return ret; + file_slot->file_ptr = 0; io_file_bitmap_clear(&ctx->file_table, slot_index); - needs_switch = true; } ret = io_scm_file_account(ctx, file); @@ -98,9 +95,6 @@ static int io_install_fixed_file(struct io_ring_ctx *ctx, struct file *file, io_fixed_file_set(file_slot, file); io_file_bitmap_set(&ctx->file_table, slot_index); } -err: - if (needs_switch) - io_rsrc_node_switch(ctx, ctx->file_data); return ret; } @@ -153,9 +147,6 @@ int io_fixed_fd_remove(struct io_ring_ctx *ctx, unsigned int offset) return -ENXIO; if (offset >= ctx->nr_user_files) return -EINVAL; - ret = io_rsrc_node_switch_start(ctx); - if (ret) - return ret; offset = array_index_nospec(offset, ctx->nr_user_files); file_slot = io_fixed_file_slot(&ctx->file_table, offset); @@ -163,13 +154,12 @@ int io_fixed_fd_remove(struct io_ring_ctx *ctx, unsigned int offset) return -EBADF; file = (struct file *)(file_slot->file_ptr & FFS_MASK); - ret = io_queue_rsrc_removal(ctx->file_data, offset, ctx->rsrc_node, file); + ret = io_queue_rsrc_removal(ctx->file_data, offset, file); if (ret) return ret; file_slot->file_ptr = 0; io_file_bitmap_clear(&ctx->file_table, offset); - io_rsrc_node_switch(ctx, ctx->file_data); return 0; } diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c index 411bb2d1acd4..b2715988791e 100644 --- a/io_uring/io-wq.c +++ b/io_uring/io-wq.c @@ -15,6 +15,7 @@ #include <linux/cpu.h> #include <linux/task_work.h> #include <linux/audit.h> +#include <linux/mmu_context.h> #include <uapi/linux/io_uring.h> #include "io-wq.h" @@ -39,7 +40,7 @@ enum { }; /* - * One for each thread in a wqe pool + * One for each thread in a wq pool */ struct io_worker { refcount_t ref; @@ -47,7 +48,7 @@ struct io_worker { struct hlist_nulls_node nulls_node; struct list_head all_list; struct task_struct *task; - struct io_wqe *wqe; + struct io_wq *wq; struct io_wq_work *cur_work; struct io_wq_work *next_work; @@ -73,7 +74,7 @@ struct io_worker { #define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER) -struct io_wqe_acct { +struct io_wq_acct { unsigned nr_workers; unsigned max_workers; int index; @@ -90,26 +91,6 @@ enum { }; /* - * Per-node worker thread pool - */ -struct io_wqe { - raw_spinlock_t lock; - struct io_wqe_acct acct[IO_WQ_ACCT_NR]; - - int node; - - struct hlist_nulls_head free_list; - struct list_head all_list; - - struct wait_queue_entry wait; - - struct io_wq *wq; - struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; - - cpumask_var_t cpu_mask; -}; - -/* * Per io_wq state */ struct io_wq { @@ -127,7 +108,19 @@ struct io_wq { struct task_struct *task; - struct io_wqe *wqes[]; + struct io_wq_acct acct[IO_WQ_ACCT_NR]; + + /* lock protects access to elements below */ + raw_spinlock_t lock; + + struct hlist_nulls_head free_list; + struct list_head all_list; + + struct wait_queue_entry wait; + + struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; + + cpumask_var_t cpu_mask; }; static enum cpuhp_state io_wq_online; @@ -140,10 +133,10 @@ struct io_cb_cancel_data { bool cancel_all; }; -static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index); -static void io_wqe_dec_running(struct io_worker *worker); -static bool io_acct_cancel_pending_work(struct io_wqe *wqe, - struct io_wqe_acct *acct, +static bool create_io_worker(struct io_wq *wq, int index); +static void io_wq_dec_running(struct io_worker *worker); +static bool io_acct_cancel_pending_work(struct io_wq *wq, + struct io_wq_acct *acct, struct io_cb_cancel_data *match); static void create_worker_cb(struct callback_head *cb); static void io_wq_cancel_tw_create(struct io_wq *wq); @@ -159,20 +152,20 @@ static void io_worker_release(struct io_worker *worker) complete(&worker->ref_done); } -static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound) +static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound) { - return &wqe->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND]; + return &wq->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND]; } -static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe, - struct io_wq_work *work) +static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq, + struct io_wq_work *work) { - return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND)); + return io_get_acct(wq, !(work->flags & IO_WQ_WORK_UNBOUND)); } -static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker) +static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker) { - return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND); + return io_get_acct(worker->wq, worker->flags & IO_WORKER_F_BOUND); } static void io_worker_ref_put(struct io_wq *wq) @@ -183,14 +176,13 @@ static void io_worker_ref_put(struct io_wq *wq) static void io_worker_cancel_cb(struct io_worker *worker) { - struct io_wqe_acct *acct = io_wqe_get_acct(worker); - struct io_wqe *wqe = worker->wqe; - struct io_wq *wq = wqe->wq; + struct io_wq_acct *acct = io_wq_get_acct(worker); + struct io_wq *wq = worker->wq; atomic_dec(&acct->nr_running); - raw_spin_lock(&worker->wqe->lock); + raw_spin_lock(&wq->lock); acct->nr_workers--; - raw_spin_unlock(&worker->wqe->lock); + raw_spin_unlock(&wq->lock); io_worker_ref_put(wq); clear_bit_unlock(0, &worker->create_state); io_worker_release(worker); @@ -208,8 +200,7 @@ static bool io_task_worker_match(struct callback_head *cb, void *data) static void io_worker_exit(struct io_worker *worker) { - struct io_wqe *wqe = worker->wqe; - struct io_wq *wq = wqe->wq; + struct io_wq *wq = worker->wq; while (1) { struct callback_head *cb = task_work_cancel_match(wq->task, @@ -223,23 +214,23 @@ static void io_worker_exit(struct io_worker *worker) io_worker_release(worker); wait_for_completion(&worker->ref_done); - raw_spin_lock(&wqe->lock); + raw_spin_lock(&wq->lock); if (worker->flags & IO_WORKER_F_FREE) hlist_nulls_del_rcu(&worker->nulls_node); list_del_rcu(&worker->all_list); - raw_spin_unlock(&wqe->lock); - io_wqe_dec_running(worker); + raw_spin_unlock(&wq->lock); + io_wq_dec_running(worker); worker->flags = 0; preempt_disable(); current->flags &= ~PF_IO_WORKER; preempt_enable(); kfree_rcu(worker, rcu); - io_worker_ref_put(wqe->wq); + io_worker_ref_put(wq); do_exit(0); } -static inline bool io_acct_run_queue(struct io_wqe_acct *acct) +static inline bool io_acct_run_queue(struct io_wq_acct *acct) { bool ret = false; @@ -256,8 +247,8 @@ static inline bool io_acct_run_queue(struct io_wqe_acct *acct) * Check head of free list for an available worker. If one isn't available, * caller must create one. */ -static bool io_wqe_activate_free_worker(struct io_wqe *wqe, - struct io_wqe_acct *acct) +static bool io_wq_activate_free_worker(struct io_wq *wq, + struct io_wq_acct *acct) __must_hold(RCU) { struct hlist_nulls_node *n; @@ -268,10 +259,10 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe, * activate. If a given worker is on the free_list but in the process * of exiting, keep trying. */ - hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) { + hlist_nulls_for_each_entry_rcu(worker, n, &wq->free_list, nulls_node) { if (!io_worker_get(worker)) continue; - if (io_wqe_get_acct(worker) != acct) { + if (io_wq_get_acct(worker) != acct) { io_worker_release(worker); continue; } @@ -289,7 +280,7 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe, * We need a worker. If we find a free one, we're good. If not, and we're * below the max number of workers, create one. */ -static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) +static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct) { /* * Most likely an attempt to queue unbounded work on an io_wq that @@ -298,21 +289,21 @@ static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) if (unlikely(!acct->max_workers)) pr_warn_once("io-wq is not configured for unbound workers"); - raw_spin_lock(&wqe->lock); + raw_spin_lock(&wq->lock); if (acct->nr_workers >= acct->max_workers) { - raw_spin_unlock(&wqe->lock); + raw_spin_unlock(&wq->lock); return true; } acct->nr_workers++; - raw_spin_unlock(&wqe->lock); + raw_spin_unlock(&wq->lock); atomic_inc(&acct->nr_running); - atomic_inc(&wqe->wq->worker_refs); - return create_io_worker(wqe->wq, wqe, acct->index); + atomic_inc(&wq->worker_refs); + return create_io_worker(wq, acct->index); } -static void io_wqe_inc_running(struct io_worker *worker) +static void io_wq_inc_running(struct io_worker *worker) { - struct io_wqe_acct *acct = io_wqe_get_acct(worker); + struct io_wq_acct *acct = io_wq_get_acct(worker); atomic_inc(&acct->nr_running); } @@ -321,22 +312,22 @@ static void create_worker_cb(struct callback_head *cb) { struct io_worker *worker; struct io_wq *wq; - struct io_wqe *wqe; - struct io_wqe_acct *acct; + + struct io_wq_acct *acct; bool do_create = false; worker = container_of(cb, struct io_worker, create_work); - wqe = worker->wqe; - wq = wqe->wq; - acct = &wqe->acct[worker->create_index]; - raw_spin_lock(&wqe->lock); + wq = worker->wq; + acct = &wq->acct[worker->create_index]; + raw_spin_lock(&wq->lock); + if (acct->nr_workers < acct->max_workers) { acct->nr_workers++; do_create = true; } - raw_spin_unlock(&wqe->lock); + raw_spin_unlock(&wq->lock); if (do_create) { - create_io_worker(wq, wqe, worker->create_index); + create_io_worker(wq, worker->create_index); } else { atomic_dec(&acct->nr_running); io_worker_ref_put(wq); @@ -346,11 +337,10 @@ static void create_worker_cb(struct callback_head *cb) } static bool io_queue_worker_create(struct io_worker *worker, - struct io_wqe_acct *acct, + struct io_wq_acct *acct, task_work_func_t func) { - struct io_wqe *wqe = worker->wqe; - struct io_wq *wq = wqe->wq; + struct io_wq *wq = worker->wq; /* raced with exit, just ignore create call */ if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) @@ -392,10 +382,10 @@ fail: return false; } -static void io_wqe_dec_running(struct io_worker *worker) +static void io_wq_dec_running(struct io_worker *worker) { - struct io_wqe_acct *acct = io_wqe_get_acct(worker); - struct io_wqe *wqe = worker->wqe; + struct io_wq_acct *acct = io_wq_get_acct(worker); + struct io_wq *wq = worker->wq; if (!(worker->flags & IO_WORKER_F_UP)) return; @@ -406,7 +396,7 @@ static void io_wqe_dec_running(struct io_worker *worker) return; atomic_inc(&acct->nr_running); - atomic_inc(&wqe->wq->worker_refs); + atomic_inc(&wq->worker_refs); io_queue_worker_create(worker, acct, create_worker_cb); } @@ -414,29 +404,25 @@ static void io_wqe_dec_running(struct io_worker *worker) * Worker will start processing some work. Move it to the busy list, if * it's currently on the freelist */ -static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker) +static void __io_worker_busy(struct io_wq *wq, struct io_worker *worker) { if (worker->flags & IO_WORKER_F_FREE) { worker->flags &= ~IO_WORKER_F_FREE; - raw_spin_lock(&wqe->lock); + raw_spin_lock(&wq->lock); hlist_nulls_del_init_rcu(&worker->nulls_node); - raw_spin_unlock(&wqe->lock); + raw_spin_unlock(&wq->lock); } } /* - * No work, worker going to sleep. Move to freelist, and unuse mm if we - * have one attached. Dropping the mm may potentially sleep, so we drop - * the lock in that case and return success. Since the caller has to - * retry the loop in that case (we changed task state), we don't regrab - * the lock if we return success. + * No work, worker going to sleep. Move to freelist. */ -static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker) - __must_hold(wqe->lock) +static void __io_worker_idle(struct io_wq *wq, struct io_worker *worker) + __must_hold(wq->lock) { if (!(worker->flags & IO_WORKER_F_FREE)) { worker->flags |= IO_WORKER_F_FREE; - hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); + hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list); } } @@ -445,17 +431,16 @@ static inline unsigned int io_get_work_hash(struct io_wq_work *work) return work->flags >> IO_WQ_HASH_SHIFT; } -static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash) +static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash) { - struct io_wq *wq = wqe->wq; bool ret = false; spin_lock_irq(&wq->hash->wait.lock); - if (list_empty(&wqe->wait.entry)) { - __add_wait_queue(&wq->hash->wait, &wqe->wait); + if (list_empty(&wq->wait.entry)) { + __add_wait_queue(&wq->hash->wait, &wq->wait); if (!test_bit(hash, &wq->hash->map)) { __set_current_state(TASK_RUNNING); - list_del_init(&wqe->wait.entry); + list_del_init(&wq->wait.entry); ret = true; } } @@ -463,14 +448,14 @@ static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash) return ret; } -static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct, +static struct io_wq_work *io_get_next_work(struct io_wq_acct *acct, struct io_worker *worker) __must_hold(acct->lock) { struct io_wq_work_node *node, *prev; struct io_wq_work *work, *tail; unsigned int stall_hash = -1U; - struct io_wqe *wqe = worker->wqe; + struct io_wq *wq = worker->wq; wq_list_for_each(node, prev, &acct->work_list) { unsigned int hash; @@ -485,11 +470,11 @@ static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct, hash = io_get_work_hash(work); /* all items with this hash lie in [work, tail] */ - tail = wqe->hash_tail[hash]; + tail = wq->hash_tail[hash]; /* hashed, can run if not already running */ - if (!test_and_set_bit(hash, &wqe->wq->hash->map)) { - wqe->hash_tail[hash] = NULL; + if (!test_and_set_bit(hash, &wq->hash->map)) { + wq->hash_tail[hash] = NULL; wq_list_cut(&acct->work_list, &tail->list, prev); return work; } @@ -508,12 +493,12 @@ static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct, */ set_bit(IO_ACCT_STALLED_BIT, &acct->flags); raw_spin_unlock(&acct->lock); - unstalled = io_wait_on_hash(wqe, stall_hash); + unstalled = io_wait_on_hash(wq, stall_hash); raw_spin_lock(&acct->lock); if (unstalled) { clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); - if (wq_has_sleeper(&wqe->wq->hash->wait)) - wake_up(&wqe->wq->hash->wait); + if (wq_has_sleeper(&wq->hash->wait)) + wake_up(&wq->hash->wait); } } @@ -534,13 +519,10 @@ static void io_assign_current_work(struct io_worker *worker, raw_spin_unlock(&worker->lock); } -static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work); - static void io_worker_handle_work(struct io_worker *worker) { - struct io_wqe_acct *acct = io_wqe_get_acct(worker); - struct io_wqe *wqe = worker->wqe; - struct io_wq *wq = wqe->wq; + struct io_wq_acct *acct = io_wq_get_acct(worker); + struct io_wq *wq = worker->wq; bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state); do { @@ -557,7 +539,7 @@ static void io_worker_handle_work(struct io_worker *worker) work = io_get_next_work(acct, worker); raw_spin_unlock(&acct->lock); if (work) { - __io_worker_busy(wqe, worker); + __io_worker_busy(wq, worker); /* * Make sure cancelation can find this, even before @@ -595,7 +577,7 @@ static void io_worker_handle_work(struct io_worker *worker) } io_assign_current_work(worker, work); if (linked) - io_wqe_enqueue(wqe, linked); + io_wq_enqueue(wq, linked); if (hash != -1U && !next_hashed) { /* serialize hash clear with wake_up() */ @@ -610,13 +592,12 @@ static void io_worker_handle_work(struct io_worker *worker) } while (1); } -static int io_wqe_worker(void *data) +static int io_wq_worker(void *data) { struct io_worker *worker = data; - struct io_wqe_acct *acct = io_wqe_get_acct(worker); - struct io_wqe *wqe = worker->wqe; - struct io_wq *wq = wqe->wq; - bool last_timeout = false; + struct io_wq_acct *acct = io_wq_get_acct(worker); + struct io_wq *wq = worker->wq; + bool exit_mask = false, last_timeout = false; char buf[TASK_COMM_LEN]; worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); @@ -631,17 +612,20 @@ static int io_wqe_worker(void *data) while (io_acct_run_queue(acct)) io_worker_handle_work(worker); - raw_spin_lock(&wqe->lock); - /* timed out, exit unless we're the last worker */ - if (last_timeout && acct->nr_workers > 1) { + raw_spin_lock(&wq->lock); + /* + * Last sleep timed out. Exit if we're not the last worker, + * or if someone modified our affinity. + */ + if (last_timeout && (exit_mask || acct->nr_workers > 1)) { acct->nr_workers--; - raw_spin_unlock(&wqe->lock); + raw_spin_unlock(&wq->lock); __set_current_state(TASK_RUNNING); break; } last_timeout = false; - __io_worker_idle(wqe, worker); - raw_spin_unlock(&wqe->lock); + __io_worker_idle(wq, worker); + raw_spin_unlock(&wq->lock); if (io_run_task_work()) continue; ret = schedule_timeout(WORKER_IDLE_TIMEOUT); @@ -652,7 +636,11 @@ static int io_wqe_worker(void *data) continue; break; } - last_timeout = !ret; + if (!ret) { + last_timeout = true; + exit_mask = !cpumask_test_cpu(raw_smp_processor_id(), + wq->cpu_mask); + } } if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) @@ -676,7 +664,7 @@ void io_wq_worker_running(struct task_struct *tsk) if (worker->flags & IO_WORKER_F_RUNNING) return; worker->flags |= IO_WORKER_F_RUNNING; - io_wqe_inc_running(worker); + io_wq_inc_running(worker); } /* @@ -695,22 +683,21 @@ void io_wq_worker_sleeping(struct task_struct *tsk) return; worker->flags &= ~IO_WORKER_F_RUNNING; - io_wqe_dec_running(worker); + io_wq_dec_running(worker); } -static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker, +static void io_init_new_worker(struct io_wq *wq, struct io_worker *worker, struct task_struct *tsk) { tsk->worker_private = worker; worker->task = tsk; - set_cpus_allowed_ptr(tsk, wqe->cpu_mask); - tsk->flags |= PF_NO_SETAFFINITY; + set_cpus_allowed_ptr(tsk, wq->cpu_mask); - raw_spin_lock(&wqe->lock); - hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); - list_add_tail_rcu(&worker->all_list, &wqe->all_list); + raw_spin_lock(&wq->lock); + hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list); + list_add_tail_rcu(&worker->all_list, &wq->all_list); worker->flags |= IO_WORKER_F_FREE; - raw_spin_unlock(&wqe->lock); + raw_spin_unlock(&wq->lock); wake_up_new_task(tsk); } @@ -743,21 +730,21 @@ static void create_worker_cont(struct callback_head *cb) { struct io_worker *worker; struct task_struct *tsk; - struct io_wqe *wqe; + struct io_wq *wq; worker = container_of(cb, struct io_worker, create_work); clear_bit_unlock(0, &worker->create_state); - wqe = worker->wqe; - tsk = create_io_thread(io_wqe_worker, worker, wqe->node); + wq = worker->wq; + tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); if (!IS_ERR(tsk)) { - io_init_new_worker(wqe, worker, tsk); + io_init_new_worker(wq, worker, tsk); io_worker_release(worker); return; } else if (!io_should_retry_thread(PTR_ERR(tsk))) { - struct io_wqe_acct *acct = io_wqe_get_acct(worker); + struct io_wq_acct *acct = io_wq_get_acct(worker); atomic_dec(&acct->nr_running); - raw_spin_lock(&wqe->lock); + raw_spin_lock(&wq->lock); acct->nr_workers--; if (!acct->nr_workers) { struct io_cb_cancel_data match = { @@ -765,13 +752,13 @@ static void create_worker_cont(struct callback_head *cb) .cancel_all = true, }; - raw_spin_unlock(&wqe->lock); - while (io_acct_cancel_pending_work(wqe, acct, &match)) + raw_spin_unlock(&wq->lock); + while (io_acct_cancel_pending_work(wq, acct, &match)) ; } else { - raw_spin_unlock(&wqe->lock); + raw_spin_unlock(&wq->lock); } - io_worker_ref_put(wqe->wq); + io_worker_ref_put(wq); kfree(worker); return; } @@ -784,42 +771,42 @@ static void create_worker_cont(struct callback_head *cb) static void io_workqueue_create(struct work_struct *work) { struct io_worker *worker = container_of(work, struct io_worker, work); - struct io_wqe_acct *acct = io_wqe_get_acct(worker); + struct io_wq_acct *acct = io_wq_get_acct(worker); if (!io_queue_worker_create(worker, acct, create_worker_cont)) kfree(worker); } -static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) +static bool create_io_worker(struct io_wq *wq, int index) { - struct io_wqe_acct *acct = &wqe->acct[index]; + struct io_wq_acct *acct = &wq->acct[index]; struct io_worker *worker; struct task_struct *tsk; __set_current_state(TASK_RUNNING); - worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node); + worker = kzalloc(sizeof(*worker), GFP_KERNEL); if (!worker) { fail: atomic_dec(&acct->nr_running); - raw_spin_lock(&wqe->lock); + raw_spin_lock(&wq->lock); acct->nr_workers--; - raw_spin_unlock(&wqe->lock); + raw_spin_unlock(&wq->lock); io_worker_ref_put(wq); return false; } refcount_set(&worker->ref, 1); - worker->wqe = wqe; + worker->wq = wq; raw_spin_lock_init(&worker->lock); init_completion(&worker->ref_done); if (index == IO_WQ_ACCT_BOUND) worker->flags |= IO_WORKER_F_BOUND; - tsk = create_io_thread(io_wqe_worker, worker, wqe->node); + tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); if (!IS_ERR(tsk)) { - io_init_new_worker(wqe, worker, tsk); + io_init_new_worker(wq, worker, tsk); } else if (!io_should_retry_thread(PTR_ERR(tsk))) { kfree(worker); goto fail; @@ -835,14 +822,14 @@ fail: * Iterate the passed in list and call the specific function for each * worker that isn't exiting */ -static bool io_wq_for_each_worker(struct io_wqe *wqe, +static bool io_wq_for_each_worker(struct io_wq *wq, bool (*func)(struct io_worker *, void *), void *data) { struct io_worker *worker; bool ret = false; - list_for_each_entry_rcu(worker, &wqe->all_list, all_list) { + list_for_each_entry_rcu(worker, &wq->all_list, all_list) { if (io_worker_get(worker)) { /* no task if node is/was offline */ if (worker->task) @@ -863,10 +850,8 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data) return false; } -static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe) +static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq) { - struct io_wq *wq = wqe->wq; - do { work->flags |= IO_WQ_WORK_CANCEL; wq->do_work(work); @@ -874,9 +859,9 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe) } while (work); } -static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work) +static void io_wq_insert_work(struct io_wq *wq, struct io_wq_work *work) { - struct io_wqe_acct *acct = io_work_get_acct(wqe, work); + struct io_wq_acct *acct = io_work_get_acct(wq, work); unsigned int hash; struct io_wq_work *tail; @@ -887,8 +872,8 @@ append: } hash = io_get_work_hash(work); - tail = wqe->hash_tail[hash]; - wqe->hash_tail[hash] = work; + tail = wq->hash_tail[hash]; + wq->hash_tail[hash] = work; if (!tail) goto append; @@ -900,9 +885,9 @@ static bool io_wq_work_match_item(struct io_wq_work *work, void *data) return work == data; } -static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) +void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) { - struct io_wqe_acct *acct = io_work_get_acct(wqe, work); + struct io_wq_acct *acct = io_work_get_acct(wq, work); struct io_cb_cancel_data match; unsigned work_flags = work->flags; bool do_create; @@ -911,55 +896,48 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) * If io-wq is exiting for this task, or if the request has explicitly * been marked as one that should not get executed, cancel it here. */ - if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state) || + if (test_bit(IO_WQ_BIT_EXIT, &wq->state) || (work->flags & IO_WQ_WORK_CANCEL)) { - io_run_cancel(work, wqe); + io_run_cancel(work, wq); return; } raw_spin_lock(&acct->lock); - io_wqe_insert_work(wqe, work); + io_wq_insert_work(wq, work); clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); raw_spin_unlock(&acct->lock); - raw_spin_lock(&wqe->lock); + raw_spin_lock(&wq->lock); rcu_read_lock(); - do_create = !io_wqe_activate_free_worker(wqe, acct); + do_create = !io_wq_activate_free_worker(wq, acct); rcu_read_unlock(); - raw_spin_unlock(&wqe->lock); + raw_spin_unlock(&wq->lock); if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) || !atomic_read(&acct->nr_running))) { bool did_create; - did_create = io_wqe_create_worker(wqe, acct); + did_create = io_wq_create_worker(wq, acct); if (likely(did_create)) return; - raw_spin_lock(&wqe->lock); + raw_spin_lock(&wq->lock); if (acct->nr_workers) { - raw_spin_unlock(&wqe->lock); + raw_spin_unlock(&wq->lock); return; } - raw_spin_unlock(&wqe->lock); + raw_spin_unlock(&wq->lock); /* fatal condition, failed to create the first worker */ match.fn = io_wq_work_match_item, match.data = work, match.cancel_all = false, - io_acct_cancel_pending_work(wqe, acct, &match); + io_acct_cancel_pending_work(wq, acct, &match); } } -void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) -{ - struct io_wqe *wqe = wq->wqes[numa_node_id()]; - - io_wqe_enqueue(wqe, work); -} - /* * Work items that hash to the same value will not be done in parallel. * Used to limit concurrent writes, generally hashed by inode. @@ -1002,27 +980,27 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data) return match->nr_running && !match->cancel_all; } -static inline void io_wqe_remove_pending(struct io_wqe *wqe, +static inline void io_wq_remove_pending(struct io_wq *wq, struct io_wq_work *work, struct io_wq_work_node *prev) { - struct io_wqe_acct *acct = io_work_get_acct(wqe, work); + struct io_wq_acct *acct = io_work_get_acct(wq, work); unsigned int hash = io_get_work_hash(work); struct io_wq_work *prev_work = NULL; - if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) { + if (io_wq_is_hashed(work) && work == wq->hash_tail[hash]) { if (prev) prev_work = container_of(prev, struct io_wq_work, list); if (prev_work && io_get_work_hash(prev_work) == hash) - wqe->hash_tail[hash] = prev_work; + wq->hash_tail[hash] = prev_work; else - wqe->hash_tail[hash] = NULL; + wq->hash_tail[hash] = NULL; } wq_list_del(&acct->work_list, &work->list, prev); } -static bool io_acct_cancel_pending_work(struct io_wqe *wqe, - struct io_wqe_acct *acct, +static bool io_acct_cancel_pending_work(struct io_wq *wq, + struct io_wq_acct *acct, struct io_cb_cancel_data *match) { struct io_wq_work_node *node, *prev; @@ -1033,9 +1011,9 @@ static bool io_acct_cancel_pending_work(struct io_wqe *wqe, work = container_of(node, struct io_wq_work, list); if (!match->fn(work, match->data)) continue; - io_wqe_remove_pending(wqe, work, prev); + io_wq_remove_pending(wq, work, prev); raw_spin_unlock(&acct->lock); - io_run_cancel(work, wqe); + io_run_cancel(work, wq); match->nr_pending++; /* not safe to continue after unlock */ return true; @@ -1045,15 +1023,15 @@ static bool io_acct_cancel_pending_work(struct io_wqe *wqe, return false; } -static void io_wqe_cancel_pending_work(struct io_wqe *wqe, - struct io_cb_cancel_data *match) +static void io_wq_cancel_pending_work(struct io_wq *wq, + struct io_cb_cancel_data *match) { int i; retry: for (i = 0; i < IO_WQ_ACCT_NR; i++) { - struct io_wqe_acct *acct = io_get_acct(wqe, i == 0); + struct io_wq_acct *acct = io_get_acct(wq, i == 0); - if (io_acct_cancel_pending_work(wqe, acct, match)) { + if (io_acct_cancel_pending_work(wq, acct, match)) { if (match->cancel_all) goto retry; break; @@ -1061,11 +1039,11 @@ retry: } } -static void io_wqe_cancel_running_work(struct io_wqe *wqe, +static void io_wq_cancel_running_work(struct io_wq *wq, struct io_cb_cancel_data *match) { rcu_read_lock(); - io_wq_for_each_worker(wqe, io_wq_worker_cancel, match); + io_wq_for_each_worker(wq, io_wq_worker_cancel, match); rcu_read_unlock(); } @@ -1077,7 +1055,6 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, .data = data, .cancel_all = cancel_all, }; - int node; /* * First check pending list, if we're lucky we can just remove it @@ -1089,22 +1066,18 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, * as an indication that we attempt to signal cancellation. The * completion will run normally in this case. * - * Do both of these while holding the wqe->lock, to ensure that + * Do both of these while holding the wq->lock, to ensure that * we'll find a work item regardless of state. */ - for_each_node(node) { - struct io_wqe *wqe = wq->wqes[node]; - - io_wqe_cancel_pending_work(wqe, &match); - if (match.nr_pending && !match.cancel_all) - return IO_WQ_CANCEL_OK; - - raw_spin_lock(&wqe->lock); - io_wqe_cancel_running_work(wqe, &match); - raw_spin_unlock(&wqe->lock); - if (match.nr_running && !match.cancel_all) - return IO_WQ_CANCEL_RUNNING; - } + io_wq_cancel_pending_work(wq, &match); + if (match.nr_pending && !match.cancel_all) + return IO_WQ_CANCEL_OK; + + raw_spin_lock(&wq->lock); + io_wq_cancel_running_work(wq, &match); + raw_spin_unlock(&wq->lock); + if (match.nr_running && !match.cancel_all) + return IO_WQ_CANCEL_RUNNING; if (match.nr_running) return IO_WQ_CANCEL_RUNNING; @@ -1113,20 +1086,20 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, return IO_WQ_CANCEL_NOTFOUND; } -static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode, +static int io_wq_hash_wake(struct wait_queue_entry *wait, unsigned mode, int sync, void *key) { - struct io_wqe *wqe = container_of(wait, struct io_wqe, wait); + struct io_wq *wq = container_of(wait, struct io_wq, wait); int i; list_del_init(&wait->entry); rcu_read_lock(); for (i = 0; i < IO_WQ_ACCT_NR; i++) { - struct io_wqe_acct *acct = &wqe->acct[i]; + struct io_wq_acct *acct = &wq->acct[i]; if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags)) - io_wqe_activate_free_worker(wqe, acct); + io_wq_activate_free_worker(wq, acct); } rcu_read_unlock(); return 1; @@ -1134,7 +1107,7 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode, struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) { - int ret, node, i; + int ret, i; struct io_wq *wq; if (WARN_ON_ONCE(!data->free_work || !data->do_work)) @@ -1142,7 +1115,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) if (WARN_ON_ONCE(!bounded)) return ERR_PTR(-EINVAL); - wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL); + wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL); if (!wq) return ERR_PTR(-ENOMEM); ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node); @@ -1155,39 +1128,28 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) wq->do_work = data->do_work; ret = -ENOMEM; - for_each_node(node) { - struct io_wqe *wqe; - int alloc_node = node; - - if (!node_online(alloc_node)) - alloc_node = NUMA_NO_NODE; - wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node); - if (!wqe) - goto err; - wq->wqes[node] = wqe; - if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL)) - goto err; - cpumask_copy(wqe->cpu_mask, cpumask_of_node(node)); - wqe->node = alloc_node; - wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; - wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers = - task_rlimit(current, RLIMIT_NPROC); - INIT_LIST_HEAD(&wqe->wait.entry); - wqe->wait.func = io_wqe_hash_wake; - for (i = 0; i < IO_WQ_ACCT_NR; i++) { - struct io_wqe_acct *acct = &wqe->acct[i]; - - acct->index = i; - atomic_set(&acct->nr_running, 0); - INIT_WQ_LIST(&acct->work_list); - raw_spin_lock_init(&acct->lock); - } - wqe->wq = wq; - raw_spin_lock_init(&wqe->lock); - INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0); - INIT_LIST_HEAD(&wqe->all_list); + + if (!alloc_cpumask_var(&wq->cpu_mask, GFP_KERNEL)) + goto err; + cpumask_copy(wq->cpu_mask, cpu_possible_mask); + wq->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; + wq->acct[IO_WQ_ACCT_UNBOUND].max_workers = + task_rlimit(current, RLIMIT_NPROC); + INIT_LIST_HEAD(&wq->wait.entry); + wq->wait.func = io_wq_hash_wake; + for (i = 0; i < IO_WQ_ACCT_NR; i++) { + struct io_wq_acct *acct = &wq->acct[i]; + + acct->index = i; + atomic_set(&acct->nr_running, 0); + INIT_WQ_LIST(&acct->work_list); + raw_spin_lock_init(&acct->lock); } + raw_spin_lock_init(&wq->lock); + INIT_HLIST_NULLS_HEAD(&wq->free_list, 0); + INIT_LIST_HEAD(&wq->all_list); + wq->task = get_task_struct(data->task); atomic_set(&wq->worker_refs, 1); init_completion(&wq->worker_done); @@ -1195,12 +1157,8 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) err: io_wq_put_hash(data->hash); cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); - for_each_node(node) { - if (!wq->wqes[node]) - continue; - free_cpumask_var(wq->wqes[node]->cpu_mask); - kfree(wq->wqes[node]); - } + + free_cpumask_var(wq->cpu_mask); err_wq: kfree(wq); return ERR_PTR(ret); @@ -1213,7 +1171,7 @@ static bool io_task_work_match(struct callback_head *cb, void *data) if (cb->func != create_worker_cb && cb->func != create_worker_cont) return false; worker = container_of(cb, struct io_worker, create_work); - return worker->wqe->wq == data; + return worker->wq == data; } void io_wq_exit_start(struct io_wq *wq) @@ -1241,48 +1199,35 @@ static void io_wq_cancel_tw_create(struct io_wq *wq) static void io_wq_exit_workers(struct io_wq *wq) { - int node; - if (!wq->task) return; io_wq_cancel_tw_create(wq); rcu_read_lock(); - for_each_node(node) { - struct io_wqe *wqe = wq->wqes[node]; - - io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL); - } + io_wq_for_each_worker(wq, io_wq_worker_wake, NULL); rcu_read_unlock(); io_worker_ref_put(wq); wait_for_completion(&wq->worker_done); - for_each_node(node) { - spin_lock_irq(&wq->hash->wait.lock); - list_del_init(&wq->wqes[node]->wait.entry); - spin_unlock_irq(&wq->hash->wait.lock); - } + spin_lock_irq(&wq->hash->wait.lock); + list_del_init(&wq->wait.entry); + spin_unlock_irq(&wq->hash->wait.lock); + put_task_struct(wq->task); wq->task = NULL; } static void io_wq_destroy(struct io_wq *wq) { - int node; + struct io_cb_cancel_data match = { + .fn = io_wq_work_match_all, + .cancel_all = true, + }; cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); - - for_each_node(node) { - struct io_wqe *wqe = wq->wqes[node]; - struct io_cb_cancel_data match = { - .fn = io_wq_work_match_all, - .cancel_all = true, - }; - io_wqe_cancel_pending_work(wqe, &match); - free_cpumask_var(wqe->cpu_mask); - kfree(wqe); - } + io_wq_cancel_pending_work(wq, &match); + free_cpumask_var(wq->cpu_mask); io_wq_put_hash(wq->hash); kfree(wq); } @@ -1305,9 +1250,9 @@ static bool io_wq_worker_affinity(struct io_worker *worker, void *data) struct online_data *od = data; if (od->online) - cpumask_set_cpu(od->cpu, worker->wqe->cpu_mask); + cpumask_set_cpu(od->cpu, worker->wq->cpu_mask); else - cpumask_clear_cpu(od->cpu, worker->wqe->cpu_mask); + cpumask_clear_cpu(od->cpu, worker->wq->cpu_mask); return false; } @@ -1317,11 +1262,9 @@ static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online) .cpu = cpu, .online = online }; - int i; rcu_read_lock(); - for_each_node(i) - io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od); + io_wq_for_each_worker(wq, io_wq_worker_affinity, &od); rcu_read_unlock(); return 0; } @@ -1342,18 +1285,13 @@ static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node) int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask) { - int i; - rcu_read_lock(); - for_each_node(i) { - struct io_wqe *wqe = wq->wqes[i]; - - if (mask) - cpumask_copy(wqe->cpu_mask, mask); - else - cpumask_copy(wqe->cpu_mask, cpumask_of_node(i)); - } + if (mask) + cpumask_copy(wq->cpu_mask, mask); + else + cpumask_copy(wq->cpu_mask, cpu_possible_mask); rcu_read_unlock(); + return 0; } @@ -1363,9 +1301,9 @@ int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask) */ int io_wq_max_workers(struct io_wq *wq, int *new_count) { + struct io_wq_acct *acct; int prev[IO_WQ_ACCT_NR]; - bool first_node = true; - int i, node; + int i; BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND); BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND); @@ -1380,21 +1318,15 @@ int io_wq_max_workers(struct io_wq *wq, int *new_count) prev[i] = 0; rcu_read_lock(); - for_each_node(node) { - struct io_wqe *wqe = wq->wqes[node]; - struct io_wqe_acct *acct; - - raw_spin_lock(&wqe->lock); - for (i = 0; i < IO_WQ_ACCT_NR; i++) { - acct = &wqe->acct[i]; - if (first_node) - prev[i] = max_t(int, acct->max_workers, prev[i]); - if (new_count[i]) - acct->max_workers = new_count[i]; - } - raw_spin_unlock(&wqe->lock); - first_node = false; + + raw_spin_lock(&wq->lock); + for (i = 0; i < IO_WQ_ACCT_NR; i++) { + acct = &wq->acct[i]; + prev[i] = max_t(int, acct->max_workers, prev[i]); + if (new_count[i]) + acct->max_workers = new_count[i]; } + raw_spin_unlock(&wq->lock); rcu_read_unlock(); for (i = 0; i < IO_WQ_ACCT_NR; i++) diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index fd1cc35a1c00..3bca7a79efda 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -72,6 +72,7 @@ #include <linux/io_uring.h> #include <linux/audit.h> #include <linux/security.h> +#include <asm/shmparam.h> #define CREATE_TRACE_POINTS #include <trace/events/io_uring.h> @@ -246,12 +247,12 @@ static __cold void io_fallback_req_func(struct work_struct *work) fallback_work.work); struct llist_node *node = llist_del_all(&ctx->fallback_llist); struct io_kiocb *req, *tmp; - bool locked = true; + struct io_tw_state ts = { .locked = true, }; mutex_lock(&ctx->uring_lock); llist_for_each_entry_safe(req, tmp, node, io_task_work.node) - req->io_task_work.func(req, &locked); - if (WARN_ON_ONCE(!locked)) + req->io_task_work.func(req, &ts); + if (WARN_ON_ONCE(!ts.locked)) return; io_submit_flush_completions(ctx); mutex_unlock(&ctx->uring_lock); @@ -309,13 +310,18 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) INIT_LIST_HEAD(&ctx->sqd_list); INIT_LIST_HEAD(&ctx->cq_overflow_list); INIT_LIST_HEAD(&ctx->io_buffers_cache); - io_alloc_cache_init(&ctx->apoll_cache); - io_alloc_cache_init(&ctx->netmsg_cache); + io_alloc_cache_init(&ctx->rsrc_node_cache, IO_NODE_ALLOC_CACHE_MAX, + sizeof(struct io_rsrc_node)); + io_alloc_cache_init(&ctx->apoll_cache, IO_ALLOC_CACHE_MAX, + sizeof(struct async_poll)); + io_alloc_cache_init(&ctx->netmsg_cache, IO_ALLOC_CACHE_MAX, + sizeof(struct io_async_msghdr)); init_completion(&ctx->ref_comp); xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1); mutex_init(&ctx->uring_lock); init_waitqueue_head(&ctx->cq_wait); init_waitqueue_head(&ctx->poll_wq); + init_waitqueue_head(&ctx->rsrc_quiesce_wq); spin_lock_init(&ctx->completion_lock); spin_lock_init(&ctx->timeout_lock); INIT_WQ_LIST(&ctx->iopoll_list); @@ -324,11 +330,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) INIT_LIST_HEAD(&ctx->defer_list); INIT_LIST_HEAD(&ctx->timeout_list); INIT_LIST_HEAD(&ctx->ltimeout_list); - spin_lock_init(&ctx->rsrc_ref_lock); INIT_LIST_HEAD(&ctx->rsrc_ref_list); - INIT_DELAYED_WORK(&ctx->rsrc_put_work, io_rsrc_put_work); - init_task_work(&ctx->rsrc_put_tw, io_rsrc_put_tw); - init_llist_head(&ctx->rsrc_put_llist); init_llist_head(&ctx->work_llist); INIT_LIST_HEAD(&ctx->tctx_list); ctx->submit_state.free_list.next = NULL; @@ -424,8 +426,14 @@ static void io_prep_async_work(struct io_kiocb *req) if (req->file && !io_req_ffs_set(req)) req->flags |= io_file_get_flags(req->file) << REQ_F_SUPPORT_NOWAIT_BIT; - if (req->flags & REQ_F_ISREG) { - if (def->hash_reg_file || (ctx->flags & IORING_SETUP_IOPOLL)) + if (req->file && (req->flags & REQ_F_ISREG)) { + bool should_hash = def->hash_reg_file; + + /* don't serialize this request if the fs doesn't need it */ + if (should_hash && (req->file->f_flags & O_DIRECT) && + (req->file->f_mode & FMODE_DIO_PARALLEL_WRITE)) + should_hash = false; + if (should_hash || (ctx->flags & IORING_SETUP_IOPOLL)) io_wq_hash_work(&req->work, file_inode(req->file)); } else if (!req->file || !S_ISBLK(file_inode(req->file)->i_mode)) { if (def->unbound_nonreg_file) @@ -450,7 +458,7 @@ static void io_prep_async_link(struct io_kiocb *req) } } -void io_queue_iowq(struct io_kiocb *req, bool *dont_use) +void io_queue_iowq(struct io_kiocb *req, struct io_tw_state *ts_dont_use) { struct io_kiocb *link = io_prep_linked_timeout(req); struct io_uring_task *tctx = req->task->io_uring; @@ -620,22 +628,22 @@ static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx) io_cqring_wake(ctx); } -static inline void __io_cq_unlock_post_flush(struct io_ring_ctx *ctx) +static void __io_cq_unlock_post_flush(struct io_ring_ctx *ctx) __releases(ctx->completion_lock) { io_commit_cqring(ctx); - __io_cq_unlock(ctx); - io_commit_cqring_flush(ctx); - /* - * As ->task_complete implies that the ring is single tasked, cq_wait - * may only be waited on by the current in io_cqring_wait(), but since - * it will re-check the wakeup conditions once we return we can safely - * skip waking it up. - */ - if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN)) { - smp_mb(); - __io_cqring_wake(ctx); + if (ctx->task_complete) { + /* + * ->task_complete implies that only current might be waiting + * for CQEs, and obviously, we currently don't. No one is + * waiting, wakeups are futile, skip them. + */ + io_commit_cqring_flush(ctx); + } else { + __io_cq_unlock(ctx); + io_commit_cqring_flush(ctx); + io_cqring_wake(ctx); } } @@ -960,9 +968,10 @@ bool io_aux_cqe(struct io_ring_ctx *ctx, bool defer, u64 user_data, s32 res, u32 return true; } -static void __io_req_complete_post(struct io_kiocb *req) +static void __io_req_complete_post(struct io_kiocb *req, unsigned issue_flags) { struct io_ring_ctx *ctx = req->ctx; + struct io_rsrc_node *rsrc_node = NULL; io_cq_lock(ctx); if (!(req->flags & REQ_F_CQE_SKIP)) @@ -983,7 +992,7 @@ static void __io_req_complete_post(struct io_kiocb *req) } io_put_kbuf_comp(req); io_dismantle_req(req); - io_req_put_rsrc(req); + rsrc_node = req->rsrc_node; /* * Selected buffer deallocation in io_clean_op() assumes that * we don't hold ->completion_lock. Clean them here to avoid @@ -994,21 +1003,27 @@ static void __io_req_complete_post(struct io_kiocb *req) ctx->locked_free_nr++; } io_cq_unlock_post(ctx); + + if (rsrc_node) { + io_ring_submit_lock(ctx, issue_flags); + io_put_rsrc_node(ctx, rsrc_node); + io_ring_submit_unlock(ctx, issue_flags); + } } void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags) { - if (req->ctx->task_complete && (issue_flags & IO_URING_F_IOWQ)) { + if (req->ctx->task_complete && req->ctx->submitter_task != current) { req->io_task_work.func = io_req_task_complete; io_req_task_work_add(req); } else if (!(issue_flags & IO_URING_F_UNLOCKED) || !(req->ctx->flags & IORING_SETUP_IOPOLL)) { - __io_req_complete_post(req); + __io_req_complete_post(req, issue_flags); } else { struct io_ring_ctx *ctx = req->ctx; mutex_lock(&ctx->uring_lock); - __io_req_complete_post(req); + __io_req_complete_post(req, issue_flags & ~IO_URING_F_UNLOCKED); mutex_unlock(&ctx->uring_lock); } } @@ -1106,11 +1121,14 @@ static inline void io_dismantle_req(struct io_kiocb *req) io_put_file(req->file); } -__cold void io_free_req(struct io_kiocb *req) +static __cold void io_free_req_tw(struct io_kiocb *req, struct io_tw_state *ts) { struct io_ring_ctx *ctx = req->ctx; - io_req_put_rsrc(req); + if (req->rsrc_node) { + io_tw_lock(ctx, ts); + io_put_rsrc_node(ctx, req->rsrc_node); + } io_dismantle_req(req); io_put_task_remote(req->task, 1); @@ -1120,6 +1138,12 @@ __cold void io_free_req(struct io_kiocb *req) spin_unlock(&ctx->completion_lock); } +__cold void io_free_req(struct io_kiocb *req) +{ + req->io_task_work.func = io_free_req_tw; + io_req_task_work_add(req); +} + static void __io_req_find_next_prep(struct io_kiocb *req) { struct io_ring_ctx *ctx = req->ctx; @@ -1146,22 +1170,23 @@ static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req) return nxt; } -static void ctx_flush_and_put(struct io_ring_ctx *ctx, bool *locked) +static void ctx_flush_and_put(struct io_ring_ctx *ctx, struct io_tw_state *ts) { if (!ctx) return; if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); - if (*locked) { + if (ts->locked) { io_submit_flush_completions(ctx); mutex_unlock(&ctx->uring_lock); - *locked = false; + ts->locked = false; } percpu_ref_put(&ctx->refs); } static unsigned int handle_tw_list(struct llist_node *node, - struct io_ring_ctx **ctx, bool *locked, + struct io_ring_ctx **ctx, + struct io_tw_state *ts, struct llist_node *last) { unsigned int count = 0; @@ -1174,18 +1199,17 @@ static unsigned int handle_tw_list(struct llist_node *node, prefetch(container_of(next, struct io_kiocb, io_task_work.node)); if (req->ctx != *ctx) { - ctx_flush_and_put(*ctx, locked); + ctx_flush_and_put(*ctx, ts); *ctx = req->ctx; /* if not contended, grab and improve batching */ - *locked = mutex_trylock(&(*ctx)->uring_lock); + ts->locked = mutex_trylock(&(*ctx)->uring_lock); percpu_ref_get(&(*ctx)->refs); - } else if (!*locked) - *locked = mutex_trylock(&(*ctx)->uring_lock); - req->io_task_work.func(req, locked); + } + req->io_task_work.func(req, ts); node = next; count++; if (unlikely(need_resched())) { - ctx_flush_and_put(*ctx, locked); + ctx_flush_and_put(*ctx, ts); *ctx = NULL; cond_resched(); } @@ -1226,7 +1250,7 @@ static inline struct llist_node *io_llist_cmpxchg(struct llist_head *head, void tctx_task_work(struct callback_head *cb) { - bool uring_locked = false; + struct io_tw_state ts = {}; struct io_ring_ctx *ctx = NULL; struct io_uring_task *tctx = container_of(cb, struct io_uring_task, task_work); @@ -1243,12 +1267,12 @@ void tctx_task_work(struct callback_head *cb) do { loops++; node = io_llist_xchg(&tctx->task_list, &fake); - count += handle_tw_list(node, &ctx, &uring_locked, &fake); + count += handle_tw_list(node, &ctx, &ts, &fake); /* skip expensive cmpxchg if there are items in the list */ if (READ_ONCE(tctx->task_list.first) != &fake) continue; - if (uring_locked && !wq_list_empty(&ctx->submit_state.compl_reqs)) { + if (ts.locked && !wq_list_empty(&ctx->submit_state.compl_reqs)) { io_submit_flush_completions(ctx); if (READ_ONCE(tctx->task_list.first) != &fake) continue; @@ -1256,7 +1280,7 @@ void tctx_task_work(struct callback_head *cb) node = io_llist_cmpxchg(&tctx->task_list, &fake, NULL); } while (node != &fake); - ctx_flush_and_put(ctx, &uring_locked); + ctx_flush_and_put(ctx, &ts); /* relaxed read is enough as only the task itself sets ->in_cancel */ if (unlikely(atomic_read(&tctx->in_cancel))) @@ -1279,42 +1303,67 @@ static __cold void io_fallback_tw(struct io_uring_task *tctx) } } -static void io_req_local_work_add(struct io_kiocb *req) +static void io_req_local_work_add(struct io_kiocb *req, unsigned flags) { struct io_ring_ctx *ctx = req->ctx; + unsigned nr_wait, nr_tw, nr_tw_prev; + struct llist_node *first; - percpu_ref_get(&ctx->refs); + if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) + flags &= ~IOU_F_TWQ_LAZY_WAKE; - if (!llist_add(&req->io_task_work.node, &ctx->work_llist)) - goto put_ref; - - /* needed for the following wake up */ - smp_mb__after_atomic(); - - if (unlikely(atomic_read(&req->task->io_uring->in_cancel))) { - io_move_task_work_from_local(ctx); - goto put_ref; + first = READ_ONCE(ctx->work_llist.first); + do { + nr_tw_prev = 0; + if (first) { + struct io_kiocb *first_req = container_of(first, + struct io_kiocb, + io_task_work.node); + /* + * Might be executed at any moment, rely on + * SLAB_TYPESAFE_BY_RCU to keep it alive. + */ + nr_tw_prev = READ_ONCE(first_req->nr_tw); + } + nr_tw = nr_tw_prev + 1; + /* Large enough to fail the nr_wait comparison below */ + if (!(flags & IOU_F_TWQ_LAZY_WAKE)) + nr_tw = -1U; + + req->nr_tw = nr_tw; + req->io_task_work.node.next = first; + } while (!try_cmpxchg(&ctx->work_llist.first, &first, + &req->io_task_work.node)); + + if (!first) { + if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) + atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); + if (ctx->has_evfd) + io_eventfd_signal(ctx); } - if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) - atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); - if (ctx->has_evfd) - io_eventfd_signal(ctx); - - if (READ_ONCE(ctx->cq_waiting)) - wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE); - -put_ref: - percpu_ref_put(&ctx->refs); + nr_wait = atomic_read(&ctx->cq_wait_nr); + /* no one is waiting */ + if (!nr_wait) + return; + /* either not enough or the previous add has already woken it up */ + if (nr_wait > nr_tw || nr_tw_prev >= nr_wait) + return; + /* pairs with set_current_state() in io_cqring_wait() */ + smp_mb__after_atomic(); + wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE); } -void __io_req_task_work_add(struct io_kiocb *req, bool allow_local) +void __io_req_task_work_add(struct io_kiocb *req, unsigned flags) { struct io_uring_task *tctx = req->task->io_uring; struct io_ring_ctx *ctx = req->ctx; - if (allow_local && ctx->flags & IORING_SETUP_DEFER_TASKRUN) { - io_req_local_work_add(req); + if (!(flags & IOU_F_TWQ_FORCE_NORMAL) && + (ctx->flags & IORING_SETUP_DEFER_TASKRUN)) { + rcu_read_lock(); + io_req_local_work_add(req, flags); + rcu_read_unlock(); return; } @@ -1341,11 +1390,11 @@ static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx) io_task_work.node); node = node->next; - __io_req_task_work_add(req, false); + __io_req_task_work_add(req, IOU_F_TWQ_FORCE_NORMAL); } } -static int __io_run_local_work(struct io_ring_ctx *ctx, bool *locked) +static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts) { struct llist_node *node; unsigned int loops = 0; @@ -1362,7 +1411,7 @@ again: struct io_kiocb *req = container_of(node, struct io_kiocb, io_task_work.node); prefetch(container_of(next, struct io_kiocb, io_task_work.node)); - req->io_task_work.func(req, locked); + req->io_task_work.func(req, ts); ret++; node = next; } @@ -1370,7 +1419,7 @@ again: if (!llist_empty(&ctx->work_llist)) goto again; - if (*locked) { + if (ts->locked) { io_submit_flush_completions(ctx); if (!llist_empty(&ctx->work_llist)) goto again; @@ -1381,46 +1430,46 @@ again: static inline int io_run_local_work_locked(struct io_ring_ctx *ctx) { - bool locked; + struct io_tw_state ts = { .locked = true, }; int ret; if (llist_empty(&ctx->work_llist)) return 0; - locked = true; - ret = __io_run_local_work(ctx, &locked); + ret = __io_run_local_work(ctx, &ts); /* shouldn't happen! */ - if (WARN_ON_ONCE(!locked)) + if (WARN_ON_ONCE(!ts.locked)) mutex_lock(&ctx->uring_lock); return ret; } static int io_run_local_work(struct io_ring_ctx *ctx) { - bool locked = mutex_trylock(&ctx->uring_lock); + struct io_tw_state ts = {}; int ret; - ret = __io_run_local_work(ctx, &locked); - if (locked) + ts.locked = mutex_trylock(&ctx->uring_lock); + ret = __io_run_local_work(ctx, &ts); + if (ts.locked) mutex_unlock(&ctx->uring_lock); return ret; } -static void io_req_task_cancel(struct io_kiocb *req, bool *locked) +static void io_req_task_cancel(struct io_kiocb *req, struct io_tw_state *ts) { - io_tw_lock(req->ctx, locked); + io_tw_lock(req->ctx, ts); io_req_defer_failed(req, req->cqe.res); } -void io_req_task_submit(struct io_kiocb *req, bool *locked) +void io_req_task_submit(struct io_kiocb *req, struct io_tw_state *ts) { - io_tw_lock(req->ctx, locked); + io_tw_lock(req->ctx, ts); /* req->task == current here, checking PF_EXITING is safe */ if (unlikely(req->task->flags & PF_EXITING)) io_req_defer_failed(req, -EFAULT); else if (req->flags & REQ_F_FORCE_ASYNC) - io_queue_iowq(req, locked); + io_queue_iowq(req, ts); else io_queue_sqe(req); } @@ -1499,14 +1548,14 @@ void io_free_batch_list(struct io_ring_ctx *ctx, struct io_wq_work_node *node) static void __io_submit_flush_completions(struct io_ring_ctx *ctx) __must_hold(&ctx->uring_lock) { - struct io_wq_work_node *node, *prev; struct io_submit_state *state = &ctx->submit_state; + struct io_wq_work_node *node; __io_cq_lock(ctx); /* must come first to preserve CQE ordering in failure cases */ if (state->cqes_count) __io_flush_post_cqes(ctx); - wq_list_for_each(node, prev, &state->compl_reqs) { + __wq_list_for_each(node, &state->compl_reqs) { struct io_kiocb *req = container_of(node, struct io_kiocb, comp_list); @@ -1646,9 +1695,9 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min) return ret; } -void io_req_task_complete(struct io_kiocb *req, bool *locked) +void io_req_task_complete(struct io_kiocb *req, struct io_tw_state *ts) { - if (*locked) + if (ts->locked) io_req_complete_defer(req); else io_req_complete_post(req, IO_URING_F_UNLOCKED); @@ -1927,9 +1976,9 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags) return 0; } -int io_poll_issue(struct io_kiocb *req, bool *locked) +int io_poll_issue(struct io_kiocb *req, struct io_tw_state *ts) { - io_tw_lock(req->ctx, locked); + io_tw_lock(req->ctx, ts); return io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_MULTISHOT| IO_URING_F_COMPLETE_DEFER); } @@ -2298,8 +2347,7 @@ static inline int io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, if (unlikely(ret)) return io_submit_fail_init(sqe, req, ret); - /* don't need @sqe from now on */ - trace_io_uring_submit_sqe(req, true); + trace_io_uring_submit_req(req); /* * If we already have a head request, queue this one for async @@ -2428,7 +2476,7 @@ int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr) if (unlikely(!entries)) return 0; /* make sure SQ entry isn't read before tail */ - ret = left = min3(nr, ctx->sq_entries, entries); + ret = left = min(nr, entries); io_get_task_refs(left); io_submit_state_start(&ctx->submit_state, left); @@ -2600,7 +2648,9 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, unsigned long check_cq; if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) { - WRITE_ONCE(ctx->cq_waiting, 1); + int nr_wait = (int) iowq.cq_tail - READ_ONCE(ctx->rings->cq.tail); + + atomic_set(&ctx->cq_wait_nr, nr_wait); set_current_state(TASK_INTERRUPTIBLE); } else { prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq, @@ -2609,7 +2659,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, ret = io_cqring_wait_schedule(ctx, &iowq); __set_current_state(TASK_RUNNING); - WRITE_ONCE(ctx->cq_waiting, 0); + atomic_set(&ctx->cq_wait_nr, 0); if (ret < 0) break; @@ -2772,13 +2822,17 @@ static void io_req_caches_free(struct io_ring_ctx *ctx) mutex_unlock(&ctx->uring_lock); } +static void io_rsrc_node_cache_free(struct io_cache_entry *entry) +{ + kfree(container_of(entry, struct io_rsrc_node, cache)); +} + static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx) { io_sq_thread_finish(ctx); - io_rsrc_refs_drop(ctx); /* __io_rsrc_put_work() may need uring_lock to progress, wait w/o it */ - io_wait_rsrc_data(ctx->buf_data); - io_wait_rsrc_data(ctx->file_data); + if (WARN_ON_ONCE(!list_empty(&ctx->rsrc_ref_list))) + return; mutex_lock(&ctx->uring_lock); if (ctx->buf_data) @@ -2789,8 +2843,8 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx) io_eventfd_unregister(ctx); io_alloc_cache_free(&ctx->apoll_cache, io_apoll_cache_free); io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free); - mutex_unlock(&ctx->uring_lock); io_destroy_buffers(ctx); + mutex_unlock(&ctx->uring_lock); if (ctx->sq_creds) put_cred(ctx->sq_creds); if (ctx->submitter_task) @@ -2798,14 +2852,9 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx) /* there are no registered resources left, nobody uses it */ if (ctx->rsrc_node) - io_rsrc_node_destroy(ctx->rsrc_node); - if (ctx->rsrc_backup_node) - io_rsrc_node_destroy(ctx->rsrc_backup_node); - flush_delayed_work(&ctx->rsrc_put_work); - flush_delayed_work(&ctx->fallback_work); + io_rsrc_node_destroy(ctx, ctx->rsrc_node); WARN_ON_ONCE(!list_empty(&ctx->rsrc_ref_list)); - WARN_ON_ONCE(!llist_empty(&ctx->rsrc_put_llist)); #if defined(CONFIG_UNIX) if (ctx->ring_sock) { @@ -2815,6 +2864,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx) #endif WARN_ON_ONCE(!list_empty(&ctx->ltimeout_list)); + io_alloc_cache_free(&ctx->rsrc_node_cache, io_rsrc_node_cache_free); if (ctx->mm_account) { mmdrop(ctx->mm_account); ctx->mm_account = NULL; @@ -3031,6 +3081,10 @@ static __cold void io_ring_exit_work(struct work_struct *work) spin_lock(&ctx->completion_lock); spin_unlock(&ctx->completion_lock); + /* pairs with RCU read section in io_req_local_work_add() */ + if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) + synchronize_rcu(); + io_ring_ctx_free(ctx); } @@ -3146,6 +3200,12 @@ static __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx, enum io_wq_cancel cret; bool ret = false; + /* set it so io_req_local_work_add() would wake us up */ + if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) { + atomic_set(&ctx->cq_wait_nr, 1); + smp_mb(); + } + /* failed during ring init, it couldn't have issued any requests */ if (!ctx->rings) return false; @@ -3200,6 +3260,8 @@ __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd) { struct io_uring_task *tctx = current->io_uring; struct io_ring_ctx *ctx; + struct io_tctx_node *node; + unsigned long index; s64 inflight; DEFINE_WAIT(wait); @@ -3221,9 +3283,6 @@ __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd) break; if (!sqd) { - struct io_tctx_node *node; - unsigned long index; - xa_for_each(&tctx->xa, index, node) { /* sqpoll task will cancel all its requests */ if (node->ctx->sq_data) @@ -3246,7 +3305,13 @@ __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd) prepare_to_wait(&tctx->wait, &wait, TASK_INTERRUPTIBLE); io_run_task_work(); io_uring_drop_tctx_refs(current); - + xa_for_each(&tctx->xa, index, node) { + if (!llist_empty(&node->ctx->work_llist)) { + WARN_ON_ONCE(node->ctx->submitter_task && + node->ctx->submitter_task != current); + goto end_wait; + } + } /* * If we've seen completions, retry without waiting. This * avoids a race where a completion comes in before we did @@ -3254,6 +3319,7 @@ __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd) */ if (inflight == tctx_inflight(tctx, !cancel_all)) schedule(); +end_wait: finish_wait(&tctx->wait, &wait); } while (1); @@ -3282,7 +3348,7 @@ static void *io_uring_validate_mmap_request(struct file *file, struct page *page; void *ptr; - switch (offset) { + switch (offset & IORING_OFF_MMAP_MASK) { case IORING_OFF_SQ_RING: case IORING_OFF_CQ_RING: ptr = ctx->rings; @@ -3290,6 +3356,17 @@ static void *io_uring_validate_mmap_request(struct file *file, case IORING_OFF_SQES: ptr = ctx->sq_sqes; break; + case IORING_OFF_PBUF_RING: { + unsigned int bgid; + + bgid = (offset & ~IORING_OFF_MMAP_MASK) >> IORING_OFF_PBUF_SHIFT; + mutex_lock(&ctx->uring_lock); + ptr = io_pbuf_get_address(ctx, bgid); + mutex_unlock(&ctx->uring_lock); + if (!ptr) + return ERR_PTR(-EINVAL); + break; + } default: return ERR_PTR(-EINVAL); } @@ -3317,6 +3394,54 @@ static __cold int io_uring_mmap(struct file *file, struct vm_area_struct *vma) return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot); } +static unsigned long io_uring_mmu_get_unmapped_area(struct file *filp, + unsigned long addr, unsigned long len, + unsigned long pgoff, unsigned long flags) +{ + const unsigned long mmap_end = arch_get_mmap_end(addr, len, flags); + struct vm_unmapped_area_info info; + void *ptr; + + /* + * Do not allow to map to user-provided address to avoid breaking the + * aliasing rules. Userspace is not able to guess the offset address of + * kernel kmalloc()ed memory area. + */ + if (addr) + return -EINVAL; + + ptr = io_uring_validate_mmap_request(filp, pgoff, len); + if (IS_ERR(ptr)) + return -ENOMEM; + + info.flags = VM_UNMAPPED_AREA_TOPDOWN; + info.length = len; + info.low_limit = max(PAGE_SIZE, mmap_min_addr); + info.high_limit = arch_get_mmap_base(addr, current->mm->mmap_base); +#ifdef SHM_COLOUR + info.align_mask = PAGE_MASK & (SHM_COLOUR - 1UL); +#else + info.align_mask = PAGE_MASK & (SHMLBA - 1UL); +#endif + info.align_offset = (unsigned long) ptr; + + /* + * A failed mmap() very likely causes application failure, + * so fall back to the bottom-up function here. This scenario + * can happen with large stack limits and large mmap() + * allocations. + */ + addr = vm_unmapped_area(&info); + if (offset_in_page(addr)) { + info.flags = 0; + info.low_limit = TASK_UNMAPPED_BASE; + info.high_limit = mmap_end; + addr = vm_unmapped_area(&info); + } + + return addr; +} + #else /* !CONFIG_MMU */ static int io_uring_mmap(struct file *file, struct vm_area_struct *vma) @@ -3529,6 +3654,8 @@ static const struct file_operations io_uring_fops = { #ifndef CONFIG_MMU .get_unmapped_area = io_uring_nommu_get_unmapped_area, .mmap_capabilities = io_uring_nommu_mmap_capabilities, +#else + .get_unmapped_area = io_uring_mmu_get_unmapped_area, #endif .poll = io_uring_poll, #ifdef CONFIG_PROC_FS @@ -3755,11 +3882,10 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p, ret = io_sq_offload_create(ctx, p); if (ret) goto err; - /* always set a rsrc node */ - ret = io_rsrc_node_switch_start(ctx); + + ret = io_rsrc_init(ctx); if (ret) goto err; - io_rsrc_node_switch(ctx, NULL); memset(&p->sq_off, 0, sizeof(p->sq_off)); p->sq_off.head = offsetof(struct io_rings, sq.head); @@ -4425,7 +4551,7 @@ static int __init io_uring_init(void) io_uring_optable_init(); req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC | - SLAB_ACCOUNT); + SLAB_ACCOUNT | SLAB_TYPESAFE_BY_RCU); return 0; }; __initcall(io_uring_init); diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h index 2711865f1e19..25515d69d205 100644 --- a/io_uring/io_uring.h +++ b/io_uring/io_uring.h @@ -16,6 +16,20 @@ #endif enum { + /* don't use deferred task_work */ + IOU_F_TWQ_FORCE_NORMAL = 1, + + /* + * A hint to not wake right away but delay until there are enough of + * tw's queued to match the number of CQEs the task is waiting for. + * + * Must not be used wirh requests generating more than one CQE. + * It's also ignored unless IORING_SETUP_DEFER_TASKRUN is set. + */ + IOU_F_TWQ_LAZY_WAKE = 2, +}; + +enum { IOU_OK = 0, IOU_ISSUE_SKIP_COMPLETE = -EIOCBQUEUED, @@ -48,20 +62,20 @@ static inline bool io_req_ffs_set(struct io_kiocb *req) return req->flags & REQ_F_FIXED_FILE; } -void __io_req_task_work_add(struct io_kiocb *req, bool allow_local); +void __io_req_task_work_add(struct io_kiocb *req, unsigned flags); bool io_is_uring_fops(struct file *file); bool io_alloc_async_data(struct io_kiocb *req); void io_req_task_queue(struct io_kiocb *req); -void io_queue_iowq(struct io_kiocb *req, bool *dont_use); -void io_req_task_complete(struct io_kiocb *req, bool *locked); +void io_queue_iowq(struct io_kiocb *req, struct io_tw_state *ts_dont_use); +void io_req_task_complete(struct io_kiocb *req, struct io_tw_state *ts); void io_req_task_queue_fail(struct io_kiocb *req, int ret); -void io_req_task_submit(struct io_kiocb *req, bool *locked); +void io_req_task_submit(struct io_kiocb *req, struct io_tw_state *ts); void tctx_task_work(struct callback_head *cb); __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd); int io_uring_alloc_task_context(struct task_struct *task, struct io_ring_ctx *ctx); -int io_poll_issue(struct io_kiocb *req, bool *locked); +int io_poll_issue(struct io_kiocb *req, struct io_tw_state *ts); int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr); int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin); void io_free_batch_list(struct io_ring_ctx *ctx, struct io_wq_work_node *node); @@ -80,6 +94,8 @@ bool io_match_task_safe(struct io_kiocb *head, struct task_struct *task, #define io_lockdep_assert_cq_locked(ctx) \ do { \ + lockdep_assert(in_task()); \ + \ if (ctx->flags & IORING_SETUP_IOPOLL) { \ lockdep_assert_held(&ctx->uring_lock); \ } else if (!ctx->task_complete) { \ @@ -93,7 +109,7 @@ bool io_match_task_safe(struct io_kiocb *head, struct task_struct *task, static inline void io_req_task_work_add(struct io_kiocb *req) { - __io_req_task_work_add(req, true); + __io_req_task_work_add(req, 0); } #define io_for_each_link(pos, head) \ @@ -228,8 +244,7 @@ static inline void io_poll_wq_wake(struct io_ring_ctx *ctx) poll_to_key(EPOLL_URING_WAKE | EPOLLIN)); } -/* requires smb_mb() prior, see wq_has_sleeper() */ -static inline void __io_cqring_wake(struct io_ring_ctx *ctx) +static inline void io_cqring_wake(struct io_ring_ctx *ctx) { /* * Trigger waitqueue handler on all waiters on our waitqueue. This @@ -241,17 +256,11 @@ static inline void __io_cqring_wake(struct io_ring_ctx *ctx) * waitqueue handlers, we know we have a dependency between eventfd or * epoll and should terminate multishot poll at that point. */ - if (waitqueue_active(&ctx->cq_wait)) + if (wq_has_sleeper(&ctx->cq_wait)) __wake_up(&ctx->cq_wait, TASK_NORMAL, 0, poll_to_key(EPOLL_URING_WAKE | EPOLLIN)); } -static inline void io_cqring_wake(struct io_ring_ctx *ctx) -{ - smp_mb(); - __io_cqring_wake(ctx); -} - static inline bool io_sqring_full(struct io_ring_ctx *ctx) { struct io_rings *r = ctx->rings; @@ -262,9 +271,11 @@ static inline bool io_sqring_full(struct io_ring_ctx *ctx) static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx) { struct io_rings *rings = ctx->rings; + unsigned int entries; /* make sure SQ entry isn't read before tail */ - return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head; + entries = smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head; + return min(entries, ctx->sq_entries); } static inline int io_run_task_work(void) @@ -299,11 +310,11 @@ static inline bool io_task_work_pending(struct io_ring_ctx *ctx) return task_work_pending(current) || !wq_list_empty(&ctx->work_llist); } -static inline void io_tw_lock(struct io_ring_ctx *ctx, bool *locked) +static inline void io_tw_lock(struct io_ring_ctx *ctx, struct io_tw_state *ts) { - if (!*locked) { + if (!ts->locked) { mutex_lock(&ctx->uring_lock); - *locked = true; + ts->locked = true; } } diff --git a/io_uring/kbuf.c b/io_uring/kbuf.c index 3002dc827195..2f0181521c98 100644 --- a/io_uring/kbuf.c +++ b/io_uring/kbuf.c @@ -137,7 +137,8 @@ static void __user *io_ring_buffer_select(struct io_kiocb *req, size_t *len, return NULL; head &= bl->mask; - if (head < IO_BUFFER_LIST_BUF_PER_PAGE) { + /* mmaped buffers are always contig */ + if (bl->is_mmap || head < IO_BUFFER_LIST_BUF_PER_PAGE) { buf = &br->bufs[head]; } else { int off = head & (IO_BUFFER_LIST_BUF_PER_PAGE - 1); @@ -179,7 +180,7 @@ void __user *io_buffer_select(struct io_kiocb *req, size_t *len, bl = io_buffer_get_list(ctx, req->buf_index); if (likely(bl)) { - if (bl->buf_nr_pages) + if (bl->is_mapped) ret = io_ring_buffer_select(req, len, bl, issue_flags); else ret = io_provided_buffer_select(req, len, bl); @@ -214,31 +215,43 @@ static int __io_remove_buffers(struct io_ring_ctx *ctx, if (!nbufs) return 0; - if (bl->buf_nr_pages) { - int j; - + if (bl->is_mapped) { i = bl->buf_ring->tail - bl->head; - for (j = 0; j < bl->buf_nr_pages; j++) - unpin_user_page(bl->buf_pages[j]); - kvfree(bl->buf_pages); - bl->buf_pages = NULL; - bl->buf_nr_pages = 0; + if (bl->is_mmap) { + struct page *page; + + page = virt_to_head_page(bl->buf_ring); + if (put_page_testzero(page)) + free_compound_page(page); + bl->buf_ring = NULL; + bl->is_mmap = 0; + } else if (bl->buf_nr_pages) { + int j; + + for (j = 0; j < bl->buf_nr_pages; j++) + unpin_user_page(bl->buf_pages[j]); + kvfree(bl->buf_pages); + bl->buf_pages = NULL; + bl->buf_nr_pages = 0; + } /* make sure it's seen as empty */ INIT_LIST_HEAD(&bl->buf_list); + bl->is_mapped = 0; return i; } - /* the head kbuf is the list itself */ + /* protects io_buffers_cache */ + lockdep_assert_held(&ctx->uring_lock); + while (!list_empty(&bl->buf_list)) { struct io_buffer *nxt; nxt = list_first_entry(&bl->buf_list, struct io_buffer, list); - list_del(&nxt->list); + list_move(&nxt->list, &ctx->io_buffers_cache); if (++i == nbufs) return i; cond_resched(); } - i++; return i; } @@ -303,7 +316,7 @@ int io_remove_buffers(struct io_kiocb *req, unsigned int issue_flags) if (bl) { ret = -EINVAL; /* can't use provide/remove buffers command on mapped buffers */ - if (!bl->buf_nr_pages) + if (!bl->is_mapped) ret = __io_remove_buffers(ctx, bl, p->nbufs); } io_ring_submit_unlock(ctx, issue_flags); @@ -448,7 +461,7 @@ int io_provide_buffers(struct io_kiocb *req, unsigned int issue_flags) } } /* can't add buffers via this command for a mapped buffer ring */ - if (bl->buf_nr_pages) { + if (bl->is_mapped) { ret = -EINVAL; goto err; } @@ -463,23 +476,87 @@ err: return IOU_OK; } -int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg) +static int io_pin_pbuf_ring(struct io_uring_buf_reg *reg, + struct io_buffer_list *bl) { struct io_uring_buf_ring *br; - struct io_uring_buf_reg reg; - struct io_buffer_list *bl, *free_bl = NULL; struct page **pages; int nr_pages; + pages = io_pin_pages(reg->ring_addr, + flex_array_size(br, bufs, reg->ring_entries), + &nr_pages); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + br = page_address(pages[0]); +#ifdef SHM_COLOUR + /* + * On platforms that have specific aliasing requirements, SHM_COLOUR + * is set and we must guarantee that the kernel and user side align + * nicely. We cannot do that if IOU_PBUF_RING_MMAP isn't set and + * the application mmap's the provided ring buffer. Fail the request + * if we, by chance, don't end up with aligned addresses. The app + * should use IOU_PBUF_RING_MMAP instead, and liburing will handle + * this transparently. + */ + if ((reg->ring_addr | (unsigned long) br) & (SHM_COLOUR - 1)) { + int i; + + for (i = 0; i < nr_pages; i++) + unpin_user_page(pages[i]); + return -EINVAL; + } +#endif + bl->buf_pages = pages; + bl->buf_nr_pages = nr_pages; + bl->buf_ring = br; + bl->is_mapped = 1; + bl->is_mmap = 0; + return 0; +} + +static int io_alloc_pbuf_ring(struct io_uring_buf_reg *reg, + struct io_buffer_list *bl) +{ + gfp_t gfp = GFP_KERNEL_ACCOUNT | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP; + size_t ring_size; + void *ptr; + + ring_size = reg->ring_entries * sizeof(struct io_uring_buf_ring); + ptr = (void *) __get_free_pages(gfp, get_order(ring_size)); + if (!ptr) + return -ENOMEM; + + bl->buf_ring = ptr; + bl->is_mapped = 1; + bl->is_mmap = 1; + return 0; +} + +int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg) +{ + struct io_uring_buf_reg reg; + struct io_buffer_list *bl, *free_bl = NULL; + int ret; + if (copy_from_user(®, arg, sizeof(reg))) return -EFAULT; - if (reg.pad || reg.resv[0] || reg.resv[1] || reg.resv[2]) + if (reg.resv[0] || reg.resv[1] || reg.resv[2]) return -EINVAL; - if (!reg.ring_addr) - return -EFAULT; - if (reg.ring_addr & ~PAGE_MASK) + if (reg.flags & ~IOU_PBUF_RING_MMAP) return -EINVAL; + if (!(reg.flags & IOU_PBUF_RING_MMAP)) { + if (!reg.ring_addr) + return -EFAULT; + if (reg.ring_addr & ~PAGE_MASK) + return -EINVAL; + } else { + if (reg.ring_addr) + return -EINVAL; + } + if (!is_power_of_2(reg.ring_entries)) return -EINVAL; @@ -496,7 +573,7 @@ int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg) bl = io_buffer_get_list(ctx, reg.bgid); if (bl) { /* if mapped buffer ring OR classic exists, don't allow */ - if (bl->buf_nr_pages || !list_empty(&bl->buf_list)) + if (bl->is_mapped || !list_empty(&bl->buf_list)) return -EEXIST; } else { free_bl = bl = kzalloc(sizeof(*bl), GFP_KERNEL); @@ -504,22 +581,21 @@ int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg) return -ENOMEM; } - pages = io_pin_pages(reg.ring_addr, - flex_array_size(br, bufs, reg.ring_entries), - &nr_pages); - if (IS_ERR(pages)) { - kfree(free_bl); - return PTR_ERR(pages); + if (!(reg.flags & IOU_PBUF_RING_MMAP)) + ret = io_pin_pbuf_ring(®, bl); + else + ret = io_alloc_pbuf_ring(®, bl); + + if (!ret) { + bl->nr_entries = reg.ring_entries; + bl->mask = reg.ring_entries - 1; + + io_buffer_add_list(ctx, bl, reg.bgid); + return 0; } - br = page_address(pages[0]); - bl->buf_pages = pages; - bl->buf_nr_pages = nr_pages; - bl->nr_entries = reg.ring_entries; - bl->buf_ring = br; - bl->mask = reg.ring_entries - 1; - io_buffer_add_list(ctx, bl, reg.bgid); - return 0; + kfree(free_bl); + return ret; } int io_unregister_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg) @@ -529,13 +605,15 @@ int io_unregister_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg) if (copy_from_user(®, arg, sizeof(reg))) return -EFAULT; - if (reg.pad || reg.resv[0] || reg.resv[1] || reg.resv[2]) + if (reg.resv[0] || reg.resv[1] || reg.resv[2]) + return -EINVAL; + if (reg.flags) return -EINVAL; bl = io_buffer_get_list(ctx, reg.bgid); if (!bl) return -ENOENT; - if (!bl->buf_nr_pages) + if (!bl->is_mapped) return -EINVAL; __io_remove_buffers(ctx, bl, -1U); @@ -545,3 +623,14 @@ int io_unregister_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg) } return 0; } + +void *io_pbuf_get_address(struct io_ring_ctx *ctx, unsigned long bgid) +{ + struct io_buffer_list *bl; + + bl = io_buffer_get_list(ctx, bgid); + if (!bl || !bl->is_mmap) + return NULL; + + return bl->buf_ring; +} diff --git a/io_uring/kbuf.h b/io_uring/kbuf.h index c23e15d7d3ca..d14345ef61fc 100644 --- a/io_uring/kbuf.h +++ b/io_uring/kbuf.h @@ -23,6 +23,11 @@ struct io_buffer_list { __u16 nr_entries; __u16 head; __u16 mask; + + /* ring mapped provided buffers */ + __u8 is_mapped; + /* ring mapped provided buffers, but mmap'ed by application */ + __u8 is_mmap; }; struct io_buffer { @@ -50,6 +55,8 @@ unsigned int __io_put_kbuf(struct io_kiocb *req, unsigned issue_flags); void io_kbuf_recycle_legacy(struct io_kiocb *req, unsigned issue_flags); +void *io_pbuf_get_address(struct io_ring_ctx *ctx, unsigned long bgid); + static inline void io_kbuf_recycle_ring(struct io_kiocb *req) { /* diff --git a/io_uring/msg_ring.c b/io_uring/msg_ring.c index 8803c0979e2a..85fd7ce5f05b 100644 --- a/io_uring/msg_ring.c +++ b/io_uring/msg_ring.c @@ -202,7 +202,7 @@ static int io_msg_install_complete(struct io_kiocb *req, unsigned int issue_flag * completes with -EOVERFLOW, then the sender must ensure that a * later IORING_OP_MSG_RING delivers the message. */ - if (!io_post_aux_cqe(target_ctx, msg->user_data, msg->len, 0)) + if (!io_post_aux_cqe(target_ctx, msg->user_data, ret, 0)) ret = -EOVERFLOW; out_unlock: io_double_unlock_ctx(target_ctx); @@ -229,6 +229,8 @@ static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags) struct io_ring_ctx *ctx = req->ctx; struct file *src_file = msg->src_file; + if (msg->len) + return -EINVAL; if (target_ctx == ctx) return -EINVAL; if (target_ctx->flags & IORING_SETUP_R_DISABLED) diff --git a/io_uring/net.c b/io_uring/net.c index b7f190ca528e..89e839013837 100644 --- a/io_uring/net.c +++ b/io_uring/net.c @@ -47,6 +47,7 @@ struct io_connect { struct sockaddr __user *addr; int addr_len; bool in_progress; + bool seen_econnaborted; }; struct io_sr_msg { @@ -183,8 +184,8 @@ static int io_setup_async_msg(struct io_kiocb *req, async_msg->msg.msg_name = &async_msg->addr; /* if were using fast_iov, set it to the new one */ if (iter_is_iovec(&kmsg->msg.msg_iter) && !kmsg->free_iov) { - size_t fast_idx = kmsg->msg.msg_iter.iov - kmsg->fast_iov; - async_msg->msg.msg_iter.iov = &async_msg->fast_iov[fast_idx]; + size_t fast_idx = iter_iov(&kmsg->msg.msg_iter) - kmsg->fast_iov; + async_msg->msg.msg_iter.__iov = &async_msg->fast_iov[fast_idx]; } return -EAGAIN; @@ -1424,7 +1425,7 @@ int io_connect_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) conn->addr = u64_to_user_ptr(READ_ONCE(sqe->addr)); conn->addr_len = READ_ONCE(sqe->addr2); - conn->in_progress = false; + conn->in_progress = conn->seen_econnaborted = false; return 0; } @@ -1461,18 +1462,24 @@ int io_connect(struct io_kiocb *req, unsigned int issue_flags) ret = __sys_connect_file(req->file, &io->address, connect->addr_len, file_flags); - if ((ret == -EAGAIN || ret == -EINPROGRESS) && force_nonblock) { + if ((ret == -EAGAIN || ret == -EINPROGRESS || ret == -ECONNABORTED) + && force_nonblock) { if (ret == -EINPROGRESS) { connect->in_progress = true; - } else { - if (req_has_async_data(req)) - return -EAGAIN; - if (io_alloc_async_data(req)) { - ret = -ENOMEM; + return -EAGAIN; + } + if (ret == -ECONNABORTED) { + if (connect->seen_econnaborted) goto out; - } - memcpy(req->async_data, &__io, sizeof(__io)); + connect->seen_econnaborted = true; + } + if (req_has_async_data(req)) + return -EAGAIN; + if (io_alloc_async_data(req)) { + ret = -ENOMEM; + goto out; } + memcpy(req->async_data, &__io, sizeof(__io)); return -EAGAIN; } if (ret == -ERESTARTSYS) diff --git a/io_uring/net.h b/io_uring/net.h index 5ffa11bf5d2e..191009979bcb 100644 --- a/io_uring/net.h +++ b/io_uring/net.h @@ -5,8 +5,8 @@ #include "alloc_cache.h" -#if defined(CONFIG_NET) struct io_async_msghdr { +#if defined(CONFIG_NET) union { struct iovec fast_iov[UIO_FASTIOV]; struct { @@ -22,8 +22,11 @@ struct io_async_msghdr { struct sockaddr __user *uaddr; struct msghdr msg; struct sockaddr_storage addr; +#endif }; +#if defined(CONFIG_NET) + struct io_async_connect { struct sockaddr_storage address; }; diff --git a/io_uring/notif.c b/io_uring/notif.c index 09dfd0832d19..d3e703c37aba 100644 --- a/io_uring/notif.c +++ b/io_uring/notif.c @@ -9,7 +9,7 @@ #include "notif.h" #include "rsrc.h" -static void io_notif_complete_tw_ext(struct io_kiocb *notif, bool *locked) +static void io_notif_complete_tw_ext(struct io_kiocb *notif, struct io_tw_state *ts) { struct io_notif_data *nd = io_notif_to_data(notif); struct io_ring_ctx *ctx = notif->ctx; @@ -21,7 +21,7 @@ static void io_notif_complete_tw_ext(struct io_kiocb *notif, bool *locked) __io_unaccount_mem(ctx->user, nd->account_pages); nd->account_pages = 0; } - io_req_task_complete(notif, locked); + io_req_task_complete(notif, ts); } static void io_tx_ubuf_callback(struct sk_buff *skb, struct ubuf_info *uarg, @@ -31,7 +31,7 @@ static void io_tx_ubuf_callback(struct sk_buff *skb, struct ubuf_info *uarg, struct io_kiocb *notif = cmd_to_io_kiocb(nd); if (refcount_dec_and_test(&uarg->refcnt)) - io_req_task_work_add(notif); + __io_req_task_work_add(notif, IOU_F_TWQ_LAZY_WAKE); } static void io_tx_ubuf_callback_ext(struct sk_buff *skb, struct ubuf_info *uarg, @@ -79,7 +79,7 @@ struct io_kiocb *io_alloc_notif(struct io_ring_ctx *ctx) notif->io_task_work.func = io_req_task_complete; nd = io_notif_to_data(notif); - nd->uarg.flags = SKBFL_ZEROCOPY_FRAG | SKBFL_DONT_ORPHAN; + nd->uarg.flags = IO_NOTIF_UBUF_FLAGS; nd->uarg.callback = io_tx_ubuf_callback; refcount_set(&nd->uarg.refcnt, 1); return notif; diff --git a/io_uring/notif.h b/io_uring/notif.h index c88c800cd89d..86d32bd9f856 100644 --- a/io_uring/notif.h +++ b/io_uring/notif.h @@ -7,6 +7,7 @@ #include "rsrc.h" +#define IO_NOTIF_UBUF_FLAGS (SKBFL_ZEROCOPY_FRAG | SKBFL_DONT_ORPHAN) #define IO_NOTIF_SPLICE_BATCH 32 struct io_notif_data { @@ -33,7 +34,7 @@ static inline void io_notif_flush(struct io_kiocb *notif) /* drop slot's master ref */ if (refcount_dec_and_test(&nd->uarg.refcnt)) - io_req_task_work_add(notif); + __io_req_task_work_add(notif, IOU_F_TWQ_LAZY_WAKE); } static inline int io_notif_account_mem(struct io_kiocb *notif, unsigned len) diff --git a/io_uring/poll.c b/io_uring/poll.c index 795facbd0e9f..c90e47dc1e29 100644 --- a/io_uring/poll.c +++ b/io_uring/poll.c @@ -148,7 +148,7 @@ static void io_poll_req_insert_locked(struct io_kiocb *req) hlist_add_head(&req->hash_node, &table->hbs[index].list); } -static void io_poll_tw_hash_eject(struct io_kiocb *req, bool *locked) +static void io_poll_tw_hash_eject(struct io_kiocb *req, struct io_tw_state *ts) { struct io_ring_ctx *ctx = req->ctx; @@ -159,7 +159,7 @@ static void io_poll_tw_hash_eject(struct io_kiocb *req, bool *locked) * already grabbed the mutex for us, but there is a chance it * failed. */ - io_tw_lock(ctx, locked); + io_tw_lock(ctx, ts); hash_del(&req->hash_node); req->flags &= ~REQ_F_HASH_LOCKED; } else { @@ -238,7 +238,7 @@ enum { * req->cqe.res. IOU_POLL_REMOVE_POLL_USE_RES indicates to remove multishot * poll and that the result is stored in req->cqe. */ -static int io_poll_check_events(struct io_kiocb *req, bool *locked) +static int io_poll_check_events(struct io_kiocb *req, struct io_tw_state *ts) { int v; @@ -300,13 +300,13 @@ static int io_poll_check_events(struct io_kiocb *req, bool *locked) __poll_t mask = mangle_poll(req->cqe.res & req->apoll_events); - if (!io_aux_cqe(req->ctx, *locked, req->cqe.user_data, + if (!io_aux_cqe(req->ctx, ts->locked, req->cqe.user_data, mask, IORING_CQE_F_MORE, false)) { io_req_set_res(req, mask, 0); return IOU_POLL_REMOVE_POLL_USE_RES; } } else { - int ret = io_poll_issue(req, locked); + int ret = io_poll_issue(req, ts); if (ret == IOU_STOP_MULTISHOT) return IOU_POLL_REMOVE_POLL_USE_RES; if (ret < 0) @@ -326,15 +326,15 @@ static int io_poll_check_events(struct io_kiocb *req, bool *locked) return IOU_POLL_NO_ACTION; } -static void io_poll_task_func(struct io_kiocb *req, bool *locked) +static void io_poll_task_func(struct io_kiocb *req, struct io_tw_state *ts) { int ret; - ret = io_poll_check_events(req, locked); + ret = io_poll_check_events(req, ts); if (ret == IOU_POLL_NO_ACTION) return; io_poll_remove_entries(req); - io_poll_tw_hash_eject(req, locked); + io_poll_tw_hash_eject(req, ts); if (req->opcode == IORING_OP_POLL_ADD) { if (ret == IOU_POLL_DONE) { @@ -343,7 +343,7 @@ static void io_poll_task_func(struct io_kiocb *req, bool *locked) poll = io_kiocb_to_cmd(req, struct io_poll); req->cqe.res = mangle_poll(req->cqe.res & poll->events); } else if (ret == IOU_POLL_REISSUE) { - io_req_task_submit(req, locked); + io_req_task_submit(req, ts); return; } else if (ret != IOU_POLL_REMOVE_POLL_USE_RES) { req->cqe.res = ret; @@ -351,14 +351,14 @@ static void io_poll_task_func(struct io_kiocb *req, bool *locked) } io_req_set_res(req, req->cqe.res, 0); - io_req_task_complete(req, locked); + io_req_task_complete(req, ts); } else { - io_tw_lock(req->ctx, locked); + io_tw_lock(req->ctx, ts); if (ret == IOU_POLL_REMOVE_POLL_USE_RES) - io_req_task_complete(req, locked); + io_req_task_complete(req, ts); else if (ret == IOU_POLL_DONE || ret == IOU_POLL_REISSUE) - io_req_task_submit(req, locked); + io_req_task_submit(req, ts); else io_req_defer_failed(req, ret); } @@ -726,6 +726,7 @@ int io_arm_poll_handler(struct io_kiocb *req, unsigned issue_flags) apoll = io_req_alloc_apoll(req, issue_flags); if (!apoll) return IO_APOLL_ABORTED; + req->flags &= ~(REQ_F_SINGLE_POLL | REQ_F_DOUBLE_POLL); req->flags |= REQ_F_POLLED; ipt.pt._qproc = io_async_queue_proc; @@ -976,7 +977,7 @@ int io_poll_remove(struct io_kiocb *req, unsigned int issue_flags) struct io_hash_bucket *bucket; struct io_kiocb *preq; int ret2, ret = 0; - bool locked; + struct io_tw_state ts = {}; preq = io_poll_find(ctx, true, &cd, &ctx->cancel_table, &bucket); ret2 = io_poll_disarm(preq); @@ -1026,8 +1027,8 @@ found: req_set_fail(preq); io_req_set_res(preq, -ECANCELED, 0); - locked = !(issue_flags & IO_URING_F_UNLOCKED); - io_req_task_complete(preq, &locked); + ts.locked = !(issue_flags & IO_URING_F_UNLOCKED); + io_req_task_complete(preq, &ts); out: if (ret < 0) { req_set_fail(req); diff --git a/io_uring/rsrc.c b/io_uring/rsrc.c index 056f40946ff6..d4c91393e0d3 100644 --- a/io_uring/rsrc.c +++ b/io_uring/rsrc.c @@ -23,25 +23,16 @@ struct io_rsrc_update { u32 offset; }; +static void io_rsrc_buf_put(struct io_ring_ctx *ctx, struct io_rsrc_put *prsrc); +static void io_rsrc_file_put(struct io_ring_ctx *ctx, struct io_rsrc_put *prsrc); static int io_sqe_buffer_register(struct io_ring_ctx *ctx, struct iovec *iov, struct io_mapped_ubuf **pimu, struct page **last_hpage); -#define IO_RSRC_REF_BATCH 100 - /* only define max */ #define IORING_MAX_FIXED_FILES (1U << 20) #define IORING_MAX_REG_BUFFERS (1U << 14) -void io_rsrc_refs_drop(struct io_ring_ctx *ctx) - __must_hold(&ctx->uring_lock) -{ - if (ctx->rsrc_cached_refs) { - io_rsrc_put_node(ctx->rsrc_node, ctx->rsrc_cached_refs); - ctx->rsrc_cached_refs = 0; - } -} - int __io_account_mem(struct user_struct *user, unsigned long nr_pages) { unsigned long page_limit, cur_pages, new_pages; @@ -151,216 +142,129 @@ static void io_buffer_unmap(struct io_ring_ctx *ctx, struct io_mapped_ubuf **slo *slot = NULL; } -void io_rsrc_refs_refill(struct io_ring_ctx *ctx) - __must_hold(&ctx->uring_lock) -{ - ctx->rsrc_cached_refs += IO_RSRC_REF_BATCH; - percpu_ref_get_many(&ctx->rsrc_node->refs, IO_RSRC_REF_BATCH); -} - -static void __io_rsrc_put_work(struct io_rsrc_node *ref_node) -{ - struct io_rsrc_data *rsrc_data = ref_node->rsrc_data; - struct io_ring_ctx *ctx = rsrc_data->ctx; - struct io_rsrc_put *prsrc, *tmp; - - list_for_each_entry_safe(prsrc, tmp, &ref_node->rsrc_list, list) { - list_del(&prsrc->list); - - if (prsrc->tag) { - if (ctx->flags & IORING_SETUP_IOPOLL) { - mutex_lock(&ctx->uring_lock); - io_post_aux_cqe(ctx, prsrc->tag, 0, 0); - mutex_unlock(&ctx->uring_lock); - } else { - io_post_aux_cqe(ctx, prsrc->tag, 0, 0); - } - } - - rsrc_data->do_put(ctx, prsrc); - kfree(prsrc); - } - - io_rsrc_node_destroy(ref_node); - if (atomic_dec_and_test(&rsrc_data->refs)) - complete(&rsrc_data->done); -} - -void io_rsrc_put_work(struct work_struct *work) +static void io_rsrc_put_work(struct io_rsrc_node *node) { - struct io_ring_ctx *ctx; - struct llist_node *node; - - ctx = container_of(work, struct io_ring_ctx, rsrc_put_work.work); - node = llist_del_all(&ctx->rsrc_put_llist); + struct io_rsrc_put *prsrc = &node->item; - while (node) { - struct io_rsrc_node *ref_node; - struct llist_node *next = node->next; + if (prsrc->tag) + io_post_aux_cqe(node->ctx, prsrc->tag, 0, 0); - ref_node = llist_entry(node, struct io_rsrc_node, llist); - __io_rsrc_put_work(ref_node); - node = next; + switch (node->type) { + case IORING_RSRC_FILE: + io_rsrc_file_put(node->ctx, prsrc); + break; + case IORING_RSRC_BUFFER: + io_rsrc_buf_put(node->ctx, prsrc); + break; + default: + WARN_ON_ONCE(1); + break; } } -void io_rsrc_put_tw(struct callback_head *cb) -{ - struct io_ring_ctx *ctx = container_of(cb, struct io_ring_ctx, - rsrc_put_tw); - - io_rsrc_put_work(&ctx->rsrc_put_work.work); -} - -void io_wait_rsrc_data(struct io_rsrc_data *data) +void io_rsrc_node_destroy(struct io_ring_ctx *ctx, struct io_rsrc_node *node) { - if (data && !atomic_dec_and_test(&data->refs)) - wait_for_completion(&data->done); + if (!io_alloc_cache_put(&ctx->rsrc_node_cache, &node->cache)) + kfree(node); } -void io_rsrc_node_destroy(struct io_rsrc_node *ref_node) +void io_rsrc_node_ref_zero(struct io_rsrc_node *node) + __must_hold(&node->ctx->uring_lock) { - percpu_ref_exit(&ref_node->refs); - kfree(ref_node); -} - -static __cold void io_rsrc_node_ref_zero(struct percpu_ref *ref) -{ - struct io_rsrc_node *node = container_of(ref, struct io_rsrc_node, refs); - struct io_ring_ctx *ctx = node->rsrc_data->ctx; - unsigned long flags; - bool first_add = false; - unsigned long delay = HZ; - - spin_lock_irqsave(&ctx->rsrc_ref_lock, flags); - node->done = true; - - /* if we are mid-quiesce then do not delay */ - if (node->rsrc_data->quiesce) - delay = 0; + struct io_ring_ctx *ctx = node->ctx; while (!list_empty(&ctx->rsrc_ref_list)) { node = list_first_entry(&ctx->rsrc_ref_list, struct io_rsrc_node, node); /* recycle ref nodes in order */ - if (!node->done) + if (node->refs) break; list_del(&node->node); - first_add |= llist_add(&node->llist, &ctx->rsrc_put_llist); - } - spin_unlock_irqrestore(&ctx->rsrc_ref_lock, flags); - if (!first_add) - return; - - if (ctx->submitter_task) { - if (!task_work_add(ctx->submitter_task, &ctx->rsrc_put_tw, - ctx->notify_method)) - return; + if (likely(!node->empty)) + io_rsrc_put_work(node); + io_rsrc_node_destroy(ctx, node); } - mod_delayed_work(system_wq, &ctx->rsrc_put_work, delay); + if (list_empty(&ctx->rsrc_ref_list) && unlikely(ctx->rsrc_quiesce)) + wake_up_all(&ctx->rsrc_quiesce_wq); } -static struct io_rsrc_node *io_rsrc_node_alloc(void) +struct io_rsrc_node *io_rsrc_node_alloc(struct io_ring_ctx *ctx) { struct io_rsrc_node *ref_node; + struct io_cache_entry *entry; - ref_node = kzalloc(sizeof(*ref_node), GFP_KERNEL); - if (!ref_node) - return NULL; - - if (percpu_ref_init(&ref_node->refs, io_rsrc_node_ref_zero, - 0, GFP_KERNEL)) { - kfree(ref_node); - return NULL; - } - INIT_LIST_HEAD(&ref_node->node); - INIT_LIST_HEAD(&ref_node->rsrc_list); - ref_node->done = false; - return ref_node; -} - -void io_rsrc_node_switch(struct io_ring_ctx *ctx, - struct io_rsrc_data *data_to_kill) - __must_hold(&ctx->uring_lock) -{ - WARN_ON_ONCE(!ctx->rsrc_backup_node); - WARN_ON_ONCE(data_to_kill && !ctx->rsrc_node); - - io_rsrc_refs_drop(ctx); - - if (data_to_kill) { - struct io_rsrc_node *rsrc_node = ctx->rsrc_node; - - rsrc_node->rsrc_data = data_to_kill; - spin_lock_irq(&ctx->rsrc_ref_lock); - list_add_tail(&rsrc_node->node, &ctx->rsrc_ref_list); - spin_unlock_irq(&ctx->rsrc_ref_lock); - - atomic_inc(&data_to_kill->refs); - percpu_ref_kill(&rsrc_node->refs); - ctx->rsrc_node = NULL; - } - - if (!ctx->rsrc_node) { - ctx->rsrc_node = ctx->rsrc_backup_node; - ctx->rsrc_backup_node = NULL; + entry = io_alloc_cache_get(&ctx->rsrc_node_cache); + if (entry) { + ref_node = container_of(entry, struct io_rsrc_node, cache); + } else { + ref_node = kzalloc(sizeof(*ref_node), GFP_KERNEL); + if (!ref_node) + return NULL; } -} -int io_rsrc_node_switch_start(struct io_ring_ctx *ctx) -{ - if (ctx->rsrc_backup_node) - return 0; - ctx->rsrc_backup_node = io_rsrc_node_alloc(); - return ctx->rsrc_backup_node ? 0 : -ENOMEM; + ref_node->ctx = ctx; + ref_node->empty = 0; + ref_node->refs = 1; + return ref_node; } __cold static int io_rsrc_ref_quiesce(struct io_rsrc_data *data, struct io_ring_ctx *ctx) { + struct io_rsrc_node *backup; + DEFINE_WAIT(we); int ret; - /* As we may drop ->uring_lock, other task may have started quiesce */ + /* As We may drop ->uring_lock, other task may have started quiesce */ if (data->quiesce) return -ENXIO; - ret = io_rsrc_node_switch_start(ctx); - if (ret) - return ret; - io_rsrc_node_switch(ctx, data); - /* kill initial ref, already quiesced if zero */ - if (atomic_dec_and_test(&data->refs)) + backup = io_rsrc_node_alloc(ctx); + if (!backup) + return -ENOMEM; + ctx->rsrc_node->empty = true; + ctx->rsrc_node->type = -1; + list_add_tail(&ctx->rsrc_node->node, &ctx->rsrc_ref_list); + io_put_rsrc_node(ctx, ctx->rsrc_node); + ctx->rsrc_node = backup; + + if (list_empty(&ctx->rsrc_ref_list)) return 0; + if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) { + atomic_set(&ctx->cq_wait_nr, 1); + smp_mb(); + } + + ctx->rsrc_quiesce++; data->quiesce = true; - mutex_unlock(&ctx->uring_lock); do { + prepare_to_wait(&ctx->rsrc_quiesce_wq, &we, TASK_INTERRUPTIBLE); + mutex_unlock(&ctx->uring_lock); + ret = io_run_task_work_sig(ctx); if (ret < 0) { - atomic_inc(&data->refs); - /* wait for all works potentially completing data->done */ - flush_delayed_work(&ctx->rsrc_put_work); - reinit_completion(&data->done); mutex_lock(&ctx->uring_lock); + if (list_empty(&ctx->rsrc_ref_list)) + ret = 0; break; } - flush_delayed_work(&ctx->rsrc_put_work); - ret = wait_for_completion_interruptible(&data->done); - if (!ret) { - mutex_lock(&ctx->uring_lock); - if (atomic_read(&data->refs) <= 0) - break; - /* - * it has been revived by another thread while - * we were unlocked - */ - mutex_unlock(&ctx->uring_lock); - } - } while (1); + schedule(); + __set_current_state(TASK_RUNNING); + mutex_lock(&ctx->uring_lock); + ret = 0; + } while (!list_empty(&ctx->rsrc_ref_list)); + + finish_wait(&ctx->rsrc_quiesce_wq, &we); data->quiesce = false; + ctx->rsrc_quiesce--; + if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) { + atomic_set(&ctx->cq_wait_nr, 0); + smp_mb(); + } return ret; } @@ -405,12 +309,12 @@ static __cold void **io_alloc_page_table(size_t size) return table; } -__cold static int io_rsrc_data_alloc(struct io_ring_ctx *ctx, - rsrc_put_fn *do_put, u64 __user *utags, +__cold static int io_rsrc_data_alloc(struct io_ring_ctx *ctx, int type, + u64 __user *utags, unsigned nr, struct io_rsrc_data **pdata) { struct io_rsrc_data *data; - int ret = -ENOMEM; + int ret = 0; unsigned i; data = kzalloc(sizeof(*data), GFP_KERNEL); @@ -424,7 +328,7 @@ __cold static int io_rsrc_data_alloc(struct io_ring_ctx *ctx, data->nr = nr; data->ctx = ctx; - data->do_put = do_put; + data->rsrc_type = type; if (utags) { ret = -EFAULT; for (i = 0; i < nr; i++) { @@ -435,9 +339,6 @@ __cold static int io_rsrc_data_alloc(struct io_ring_ctx *ctx, goto fail; } } - - atomic_set(&data->refs, 1); - init_completion(&data->done); *pdata = data; return 0; fail: @@ -456,7 +357,6 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx, struct file *file; int fd, i, err = 0; unsigned int done; - bool needs_switch = false; if (!ctx->file_data) return -ENXIO; @@ -483,12 +383,11 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx, if (file_slot->file_ptr) { file = (struct file *)(file_slot->file_ptr & FFS_MASK); - err = io_queue_rsrc_removal(data, i, ctx->rsrc_node, file); + err = io_queue_rsrc_removal(data, i, file); if (err) break; file_slot->file_ptr = 0; io_file_bitmap_clear(&ctx->file_table, i); - needs_switch = true; } if (fd != -1) { file = fget(fd); @@ -519,9 +418,6 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx, io_file_bitmap_set(&ctx->file_table, i); } } - - if (needs_switch) - io_rsrc_node_switch(ctx, data); return done ? done : err; } @@ -532,7 +428,6 @@ static int __io_sqe_buffers_update(struct io_ring_ctx *ctx, u64 __user *tags = u64_to_user_ptr(up->tags); struct iovec iov, __user *iovs = u64_to_user_ptr(up->data); struct page *last_hpage = NULL; - bool needs_switch = false; __u32 done; int i, err; @@ -543,7 +438,6 @@ static int __io_sqe_buffers_update(struct io_ring_ctx *ctx, for (done = 0; done < nr_args; done++) { struct io_mapped_ubuf *imu; - int offset = up->offset + done; u64 tag = 0; err = io_copy_iov(ctx, &iov, iovs, done); @@ -564,24 +458,20 @@ static int __io_sqe_buffers_update(struct io_ring_ctx *ctx, if (err) break; - i = array_index_nospec(offset, ctx->nr_user_bufs); + i = array_index_nospec(up->offset + done, ctx->nr_user_bufs); if (ctx->user_bufs[i] != ctx->dummy_ubuf) { err = io_queue_rsrc_removal(ctx->buf_data, i, - ctx->rsrc_node, ctx->user_bufs[i]); + ctx->user_bufs[i]); if (unlikely(err)) { io_buffer_unmap(ctx, &imu); break; } ctx->user_bufs[i] = ctx->dummy_ubuf; - needs_switch = true; } ctx->user_bufs[i] = imu; - *io_get_tag_slot(ctx->buf_data, offset) = tag; + *io_get_tag_slot(ctx->buf_data, i) = tag; } - - if (needs_switch) - io_rsrc_node_switch(ctx, ctx->buf_data); return done ? done : err; } @@ -590,13 +480,11 @@ static int __io_register_rsrc_update(struct io_ring_ctx *ctx, unsigned type, unsigned nr_args) { __u32 tmp; - int err; + + lockdep_assert_held(&ctx->uring_lock); if (check_add_overflow(up->offset, nr_args, &tmp)) return -EOVERFLOW; - err = io_rsrc_node_switch_start(ctx); - if (err) - return err; switch (type) { case IORING_RSRC_FILE: @@ -753,20 +641,24 @@ int io_files_update(struct io_kiocb *req, unsigned int issue_flags) return IOU_OK; } -int io_queue_rsrc_removal(struct io_rsrc_data *data, unsigned idx, - struct io_rsrc_node *node, void *rsrc) +int io_queue_rsrc_removal(struct io_rsrc_data *data, unsigned idx, void *rsrc) { + struct io_ring_ctx *ctx = data->ctx; + struct io_rsrc_node *node = ctx->rsrc_node; u64 *tag_slot = io_get_tag_slot(data, idx); - struct io_rsrc_put *prsrc; - prsrc = kzalloc(sizeof(*prsrc), GFP_KERNEL); - if (!prsrc) + ctx->rsrc_node = io_rsrc_node_alloc(ctx); + if (unlikely(!ctx->rsrc_node)) { + ctx->rsrc_node = node; return -ENOMEM; + } - prsrc->tag = *tag_slot; + node->item.rsrc = rsrc; + node->type = data->rsrc_type; + node->item.tag = *tag_slot; *tag_slot = 0; - prsrc->rsrc = rsrc; - list_add(&prsrc->list, &node->rsrc_list); + list_add_tail(&node->node, &ctx->rsrc_ref_list); + io_put_rsrc_node(ctx, node); return 0; } @@ -794,6 +686,7 @@ void __io_sqe_files_unregister(struct io_ring_ctx *ctx) } #endif io_free_file_tables(&ctx->file_table); + io_file_table_set_alloc_range(ctx, 0, 0); io_rsrc_data_free(ctx->file_data); ctx->file_data = NULL; ctx->nr_user_files = 0; @@ -867,8 +760,7 @@ int __io_scm_file_account(struct io_ring_ctx *ctx, struct file *file) UNIXCB(skb).fp = fpl; skb->sk = sk; - skb->scm_io_uring = 1; - skb->destructor = unix_destruct_scm; + skb->destructor = io_uring_destruct_scm; refcount_add(skb->truesize, &sk->sk_wmem_alloc); } @@ -881,20 +773,14 @@ int __io_scm_file_account(struct io_ring_ctx *ctx, struct file *file) return 0; } -static void io_rsrc_file_put(struct io_ring_ctx *ctx, struct io_rsrc_put *prsrc) +static __cold void io_rsrc_file_scm_put(struct io_ring_ctx *ctx, struct file *file) { - struct file *file = prsrc->file; #if defined(CONFIG_UNIX) struct sock *sock = ctx->ring_sock->sk; struct sk_buff_head list, *head = &sock->sk_receive_queue; struct sk_buff *skb; int i; - if (!io_file_need_scm(file)) { - fput(file); - return; - } - __skb_queue_head_init(&list); /* @@ -944,11 +830,19 @@ static void io_rsrc_file_put(struct io_ring_ctx *ctx, struct io_rsrc_put *prsrc) __skb_queue_tail(head, skb); spin_unlock_irq(&head->lock); } -#else - fput(file); #endif } +static void io_rsrc_file_put(struct io_ring_ctx *ctx, struct io_rsrc_put *prsrc) +{ + struct file *file = prsrc->file; + + if (likely(!io_file_need_scm(file))) + fput(file); + else + io_rsrc_file_scm_put(ctx, file); +} + int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg, unsigned nr_args, u64 __user *tags) { @@ -965,10 +859,7 @@ int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg, return -EMFILE; if (nr_args > rlimit(RLIMIT_NOFILE)) return -EMFILE; - ret = io_rsrc_node_switch_start(ctx); - if (ret) - return ret; - ret = io_rsrc_data_alloc(ctx, io_rsrc_file_put, tags, nr_args, + ret = io_rsrc_data_alloc(ctx, IORING_RSRC_FILE, tags, nr_args, &ctx->file_data); if (ret) return ret; @@ -1022,7 +913,6 @@ int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg, /* default it to the whole table */ io_file_table_set_alloc_range(ctx, 0, ctx->nr_user_files); - io_rsrc_node_switch(ctx, NULL); return 0; fail: __io_sqe_files_unregister(ctx); @@ -1162,17 +1052,14 @@ struct page **io_pin_pages(unsigned long ubuf, unsigned long len, int *npages) pret = pin_user_pages(ubuf, nr_pages, FOLL_WRITE | FOLL_LONGTERM, pages, vmas); if (pret == nr_pages) { - struct file *file = vmas[0]->vm_file; - /* don't support file backed memory */ for (i = 0; i < nr_pages; i++) { - if (vmas[i]->vm_file != file) { - ret = -EINVAL; - break; - } - if (!file) + struct vm_area_struct *vma = vmas[i]; + + if (vma_is_shmem(vma)) continue; - if (!vma_is_shmem(vmas[i]) && !is_file_hugepages(file)) { + if (vma->vm_file && + !is_file_hugepages(vma->vm_file)) { ret = -EOPNOTSUPP; break; } @@ -1235,7 +1122,13 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, struct iovec *iov, } } if (folio) { - folio_put_refs(folio, nr_pages - 1); + /* + * The pages are bound to the folio, it doesn't + * actually unpin them but drops all but one reference, + * which is usually put down by io_buffer_unmap(). + * Note, needs a better helper. + */ + unpin_user_pages(&pages[1], nr_pages - 1); nr_pages = 1; } } @@ -1298,10 +1191,7 @@ int io_sqe_buffers_register(struct io_ring_ctx *ctx, void __user *arg, return -EBUSY; if (!nr_args || nr_args > IORING_MAX_REG_BUFFERS) return -EINVAL; - ret = io_rsrc_node_switch_start(ctx); - if (ret) - return ret; - ret = io_rsrc_data_alloc(ctx, io_rsrc_buf_put, tags, nr_args, &data); + ret = io_rsrc_data_alloc(ctx, IORING_RSRC_BUFFER, tags, nr_args, &data); if (ret) return ret; ret = io_buffers_map_alloc(ctx, nr_args); @@ -1338,8 +1228,6 @@ int io_sqe_buffers_register(struct io_ring_ctx *ctx, void __user *arg, ctx->buf_data = data; if (ret) __io_sqe_buffers_unregister(ctx); - else - io_rsrc_node_switch(ctx, NULL); return ret; } diff --git a/io_uring/rsrc.h b/io_uring/rsrc.h index 2b8743645efc..0a8a95e9b99e 100644 --- a/io_uring/rsrc.h +++ b/io_uring/rsrc.h @@ -4,6 +4,10 @@ #include <net/af_unix.h> +#include "alloc_cache.h" + +#define IO_NODE_ALLOC_CACHE_MAX 32 + #define IO_RSRC_TAG_TABLE_SHIFT (PAGE_SHIFT - 3) #define IO_RSRC_TAG_TABLE_MAX (1U << IO_RSRC_TAG_TABLE_SHIFT) #define IO_RSRC_TAG_TABLE_MASK (IO_RSRC_TAG_TABLE_MAX - 1) @@ -14,7 +18,6 @@ enum { }; struct io_rsrc_put { - struct list_head list; u64 tag; union { void *rsrc; @@ -30,19 +33,20 @@ struct io_rsrc_data { u64 **tags; unsigned int nr; - rsrc_put_fn *do_put; - atomic_t refs; - struct completion done; + u16 rsrc_type; bool quiesce; }; struct io_rsrc_node { - struct percpu_ref refs; + union { + struct io_cache_entry cache; + struct io_ring_ctx *ctx; + }; + int refs; + bool empty; + u16 type; struct list_head node; - struct list_head rsrc_list; - struct io_rsrc_data *rsrc_data; - struct llist_node llist; - bool done; + struct io_rsrc_put item; }; struct io_mapped_ubuf { @@ -54,16 +58,10 @@ struct io_mapped_ubuf { }; void io_rsrc_put_tw(struct callback_head *cb); -void io_rsrc_put_work(struct work_struct *work); -void io_rsrc_refs_refill(struct io_ring_ctx *ctx); -void io_wait_rsrc_data(struct io_rsrc_data *data); -void io_rsrc_node_destroy(struct io_rsrc_node *ref_node); -void io_rsrc_refs_drop(struct io_ring_ctx *ctx); -int io_rsrc_node_switch_start(struct io_ring_ctx *ctx); -int io_queue_rsrc_removal(struct io_rsrc_data *data, unsigned idx, - struct io_rsrc_node *node, void *rsrc); -void io_rsrc_node_switch(struct io_ring_ctx *ctx, - struct io_rsrc_data *data_to_kill); +void io_rsrc_node_ref_zero(struct io_rsrc_node *node); +void io_rsrc_node_destroy(struct io_ring_ctx *ctx, struct io_rsrc_node *ref_node); +struct io_rsrc_node *io_rsrc_node_alloc(struct io_ring_ctx *ctx); +int io_queue_rsrc_removal(struct io_rsrc_data *data, unsigned idx, void *rsrc); int io_import_fixed(int ddir, struct iov_iter *iter, struct io_mapped_ubuf *imu, @@ -107,36 +105,24 @@ int io_register_rsrc_update(struct io_ring_ctx *ctx, void __user *arg, int io_register_rsrc(struct io_ring_ctx *ctx, void __user *arg, unsigned int size, unsigned int type); -static inline void io_rsrc_put_node(struct io_rsrc_node *node, int nr) +static inline void io_put_rsrc_node(struct io_ring_ctx *ctx, struct io_rsrc_node *node) { - percpu_ref_put_many(&node->refs, nr); -} + lockdep_assert_held(&ctx->uring_lock); -static inline void io_req_put_rsrc(struct io_kiocb *req) -{ - if (req->rsrc_node) - io_rsrc_put_node(req->rsrc_node, 1); + if (node && !--node->refs) + io_rsrc_node_ref_zero(node); } static inline void io_req_put_rsrc_locked(struct io_kiocb *req, struct io_ring_ctx *ctx) - __must_hold(&ctx->uring_lock) { - struct io_rsrc_node *node = req->rsrc_node; - - if (node) { - if (node == ctx->rsrc_node) - ctx->rsrc_cached_refs++; - else - io_rsrc_put_node(node, 1); - } + io_put_rsrc_node(ctx, req->rsrc_node); } -static inline void io_charge_rsrc_node(struct io_ring_ctx *ctx) +static inline void io_charge_rsrc_node(struct io_ring_ctx *ctx, + struct io_rsrc_node *node) { - ctx->rsrc_cached_refs--; - if (unlikely(ctx->rsrc_cached_refs < 0)) - io_rsrc_refs_refill(ctx); + node->refs++; } static inline void io_req_set_rsrc_node(struct io_kiocb *req, @@ -144,15 +130,13 @@ static inline void io_req_set_rsrc_node(struct io_kiocb *req, unsigned int issue_flags) { if (!req->rsrc_node) { - req->rsrc_node = ctx->rsrc_node; + io_ring_submit_lock(ctx, issue_flags); - if (!(issue_flags & IO_URING_F_UNLOCKED)) { - lockdep_assert_held(&ctx->uring_lock); + lockdep_assert_held(&ctx->uring_lock); - io_charge_rsrc_node(ctx); - } else { - percpu_ref_get(&req->rsrc_node->refs); - } + req->rsrc_node = ctx->rsrc_node; + io_charge_rsrc_node(ctx, ctx->rsrc_node); + io_ring_submit_unlock(ctx, issue_flags); } } @@ -164,6 +148,12 @@ static inline u64 *io_get_tag_slot(struct io_rsrc_data *data, unsigned int idx) return &data->tags[table_idx][off]; } +static inline int io_rsrc_init(struct io_ring_ctx *ctx) +{ + ctx->rsrc_node = io_rsrc_node_alloc(ctx); + return ctx->rsrc_node ? 0 : -ENOMEM; +} + int io_files_update(struct io_kiocb *req, unsigned int issue_flags); int io_files_update_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe); diff --git a/io_uring/rw.c b/io_uring/rw.c index 4c233910e200..3f118ed46e4f 100644 --- a/io_uring/rw.c +++ b/io_uring/rw.c @@ -283,16 +283,16 @@ static inline int io_fixup_rw_res(struct io_kiocb *req, long res) return res; } -static void io_req_rw_complete(struct io_kiocb *req, bool *locked) +static void io_req_rw_complete(struct io_kiocb *req, struct io_tw_state *ts) { io_req_io_end(req); if (req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)) { - unsigned issue_flags = *locked ? 0 : IO_URING_F_UNLOCKED; + unsigned issue_flags = ts->locked ? 0 : IO_URING_F_UNLOCKED; req->cqe.flags |= io_put_kbuf(req, issue_flags); } - io_req_task_complete(req, locked); + io_req_task_complete(req, ts); } static void io_complete_rw(struct kiocb *kiocb, long res) @@ -304,7 +304,7 @@ static void io_complete_rw(struct kiocb *kiocb, long res) return; io_req_set_res(req, io_fixup_rw_res(req, res), 0); req->io_task_work.func = io_req_rw_complete; - io_req_task_work_add(req); + __io_req_task_work_add(req, IOU_F_TWQ_LAZY_WAKE); } static void io_complete_rw_iopoll(struct kiocb *kiocb, long res) @@ -447,26 +447,25 @@ static ssize_t loop_rw_iter(int ddir, struct io_rw *rw, struct iov_iter *iter) ppos = io_kiocb_ppos(kiocb); while (iov_iter_count(iter)) { - struct iovec iovec; + void __user *addr; + size_t len; ssize_t nr; if (iter_is_ubuf(iter)) { - iovec.iov_base = iter->ubuf + iter->iov_offset; - iovec.iov_len = iov_iter_count(iter); + addr = iter->ubuf + iter->iov_offset; + len = iov_iter_count(iter); } else if (!iov_iter_is_bvec(iter)) { - iovec = iov_iter_iovec(iter); + addr = iter_iov_addr(iter); + len = iter_iov_len(iter); } else { - iovec.iov_base = u64_to_user_ptr(rw->addr); - iovec.iov_len = rw->len; + addr = u64_to_user_ptr(rw->addr); + len = rw->len; } - if (ddir == READ) { - nr = file->f_op->read(file, iovec.iov_base, - iovec.iov_len, ppos); - } else { - nr = file->f_op->write(file, iovec.iov_base, - iovec.iov_len, ppos); - } + if (ddir == READ) + nr = file->f_op->read(file, addr, len, ppos); + else + nr = file->f_op->write(file, addr, len, ppos); if (nr < 0) { if (!ret) @@ -482,7 +481,7 @@ static ssize_t loop_rw_iter(int ddir, struct io_rw *rw, struct iov_iter *iter) if (!rw->len) break; } - if (nr != iovec.iov_len) + if (nr != len) break; } @@ -503,10 +502,10 @@ static void io_req_map_rw(struct io_kiocb *req, const struct iovec *iovec, if (!iovec) { unsigned iov_off = 0; - io->s.iter.iov = io->s.fast_iov; - if (iter->iov != fast_iov) { - iov_off = iter->iov - fast_iov; - io->s.iter.iov += iov_off; + io->s.iter.__iov = io->s.fast_iov; + if (iter->__iov != fast_iov) { + iov_off = iter_iov(iter) - fast_iov; + io->s.iter.__iov += iov_off; } if (io->s.fast_iov != fast_iov) memcpy(io->s.fast_iov + iov_off, fast_iov + iov_off, @@ -1002,7 +1001,7 @@ void io_rw_fail(struct io_kiocb *req) int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin) { struct io_wq_work_node *pos, *start, *prev; - unsigned int poll_flags = BLK_POLL_NOSLEEP; + unsigned int poll_flags = 0; DEFINE_IO_COMP_BATCH(iob); int nr_events = 0; diff --git a/io_uring/slist.h b/io_uring/slist.h index 7c198a40d5f1..0eb194817242 100644 --- a/io_uring/slist.h +++ b/io_uring/slist.h @@ -3,6 +3,9 @@ #include <linux/io_uring_types.h> +#define __wq_list_for_each(pos, head) \ + for (pos = (head)->first; pos; pos = (pos)->next) + #define wq_list_for_each(pos, prv, head) \ for (pos = (head)->first, prv = NULL; pos; prv = pos, pos = (pos)->next) @@ -113,4 +116,4 @@ static inline struct io_wq_work *wq_next_work(struct io_wq_work *work) return container_of(work->list.next, struct io_wq_work, list); } -#endif // INTERNAL_IO_SLIST_H
\ No newline at end of file +#endif // INTERNAL_IO_SLIST_H diff --git a/io_uring/sqpoll.c b/io_uring/sqpoll.c index 0119d3f1a556..9db4bc1f521a 100644 --- a/io_uring/sqpoll.c +++ b/io_uring/sqpoll.c @@ -233,7 +233,6 @@ static int io_sq_thread(void *data) set_cpus_allowed_ptr(current, cpumask_of(sqd->sq_cpu)); else set_cpus_allowed_ptr(current, cpu_online_mask); - current->flags |= PF_NO_SETAFFINITY; mutex_lock(&sqd->lock); while (1) { diff --git a/io_uring/timeout.c b/io_uring/timeout.c index 826a51bca3e4..fc950177e2e1 100644 --- a/io_uring/timeout.c +++ b/io_uring/timeout.c @@ -17,6 +17,7 @@ struct io_timeout { struct file *file; u32 off; u32 target_seq; + u32 repeats; struct list_head list; /* head of the link, used by linked timeouts only */ struct io_kiocb *head; @@ -37,8 +38,9 @@ struct io_timeout_rem { static inline bool io_is_timeout_noseq(struct io_kiocb *req) { struct io_timeout *timeout = io_kiocb_to_cmd(req, struct io_timeout); + struct io_timeout_data *data = req->async_data; - return !timeout->off; + return !timeout->off || data->flags & IORING_TIMEOUT_MULTISHOT; } static inline void io_put_req(struct io_kiocb *req) @@ -49,6 +51,44 @@ static inline void io_put_req(struct io_kiocb *req) } } +static inline bool io_timeout_finish(struct io_timeout *timeout, + struct io_timeout_data *data) +{ + if (!(data->flags & IORING_TIMEOUT_MULTISHOT)) + return true; + + if (!timeout->off || (timeout->repeats && --timeout->repeats)) + return false; + + return true; +} + +static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer); + +static void io_timeout_complete(struct io_kiocb *req, struct io_tw_state *ts) +{ + struct io_timeout *timeout = io_kiocb_to_cmd(req, struct io_timeout); + struct io_timeout_data *data = req->async_data; + struct io_ring_ctx *ctx = req->ctx; + + if (!io_timeout_finish(timeout, data)) { + bool filled; + filled = io_aux_cqe(ctx, ts->locked, req->cqe.user_data, -ETIME, + IORING_CQE_F_MORE, false); + if (filled) { + /* re-arm timer */ + spin_lock_irq(&ctx->timeout_lock); + list_add(&timeout->list, ctx->timeout_list.prev); + data->timer.function = io_timeout_fn; + hrtimer_start(&data->timer, timespec64_to_ktime(data->ts), data->mode); + spin_unlock_irq(&ctx->timeout_lock); + return; + } + } + + io_req_task_complete(req, ts); +} + static bool io_kill_timeout(struct io_kiocb *req, int status) __must_hold(&req->ctx->timeout_lock) { @@ -101,9 +141,9 @@ __cold void io_flush_timeouts(struct io_ring_ctx *ctx) spin_unlock_irq(&ctx->timeout_lock); } -static void io_req_tw_fail_links(struct io_kiocb *link, bool *locked) +static void io_req_tw_fail_links(struct io_kiocb *link, struct io_tw_state *ts) { - io_tw_lock(link->ctx, locked); + io_tw_lock(link->ctx, ts); while (link) { struct io_kiocb *nxt = link->link; long res = -ECANCELED; @@ -112,7 +152,7 @@ static void io_req_tw_fail_links(struct io_kiocb *link, bool *locked) res = link->cqe.res; link->link = NULL; io_req_set_res(link, res, 0); - io_req_task_complete(link, locked); + io_req_task_complete(link, ts); link = nxt; } } @@ -212,7 +252,7 @@ static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer) req_set_fail(req); io_req_set_res(req, -ETIME, 0); - req->io_task_work.func = io_req_task_complete; + req->io_task_work.func = io_timeout_complete; io_req_task_work_add(req); return HRTIMER_NORESTART; } @@ -265,9 +305,9 @@ int io_timeout_cancel(struct io_ring_ctx *ctx, struct io_cancel_data *cd) return 0; } -static void io_req_task_link_timeout(struct io_kiocb *req, bool *locked) +static void io_req_task_link_timeout(struct io_kiocb *req, struct io_tw_state *ts) { - unsigned issue_flags = *locked ? 0 : IO_URING_F_UNLOCKED; + unsigned issue_flags = ts->locked ? 0 : IO_URING_F_UNLOCKED; struct io_timeout *timeout = io_kiocb_to_cmd(req, struct io_timeout); struct io_kiocb *prev = timeout->prev; int ret = -ENOENT; @@ -282,11 +322,11 @@ static void io_req_task_link_timeout(struct io_kiocb *req, bool *locked) ret = io_try_cancel(req->task->io_uring, &cd, issue_flags); } io_req_set_res(req, ret ?: -ETIME, 0); - io_req_task_complete(req, locked); + io_req_task_complete(req, ts); io_put_req(prev); } else { io_req_set_res(req, -ETIME, 0); - io_req_task_complete(req, locked); + io_req_task_complete(req, ts); } } @@ -470,16 +510,27 @@ static int __io_timeout_prep(struct io_kiocb *req, return -EINVAL; flags = READ_ONCE(sqe->timeout_flags); if (flags & ~(IORING_TIMEOUT_ABS | IORING_TIMEOUT_CLOCK_MASK | - IORING_TIMEOUT_ETIME_SUCCESS)) + IORING_TIMEOUT_ETIME_SUCCESS | + IORING_TIMEOUT_MULTISHOT)) return -EINVAL; /* more than one clock specified is invalid, obviously */ if (hweight32(flags & IORING_TIMEOUT_CLOCK_MASK) > 1) return -EINVAL; + /* multishot requests only make sense with rel values */ + if (!(~flags & (IORING_TIMEOUT_MULTISHOT | IORING_TIMEOUT_ABS))) + return -EINVAL; INIT_LIST_HEAD(&timeout->list); timeout->off = off; if (unlikely(off && !req->ctx->off_timeout_used)) req->ctx->off_timeout_used = true; + /* + * for multishot reqs w/ fixed nr of repeats, repeats tracks the + * remaining nr + */ + timeout->repeats = 0; + if ((flags & IORING_TIMEOUT_MULTISHOT) && off > 0) + timeout->repeats = off; if (WARN_ON_ONCE(req_has_async_data(req))) return -EFAULT; diff --git a/io_uring/uring_cmd.c b/io_uring/uring_cmd.c index 446a189b78b0..5113c9a48583 100644 --- a/io_uring/uring_cmd.c +++ b/io_uring/uring_cmd.c @@ -12,15 +12,16 @@ #include "rsrc.h" #include "uring_cmd.h" -static void io_uring_cmd_work(struct io_kiocb *req, bool *locked) +static void io_uring_cmd_work(struct io_kiocb *req, struct io_tw_state *ts) { struct io_uring_cmd *ioucmd = io_kiocb_to_cmd(req, struct io_uring_cmd); + unsigned issue_flags = ts->locked ? 0 : IO_URING_F_UNLOCKED; - ioucmd->task_work_cb(ioucmd); + ioucmd->task_work_cb(ioucmd, issue_flags); } void io_uring_cmd_complete_in_task(struct io_uring_cmd *ioucmd, - void (*task_work_cb)(struct io_uring_cmd *)) + void (*task_work_cb)(struct io_uring_cmd *, unsigned)) { struct io_kiocb *req = cmd_to_io_kiocb(ioucmd); @@ -42,7 +43,8 @@ static inline void io_req_set_cqe32_extra(struct io_kiocb *req, * Called by consumers of io_uring_cmd, if they originally returned * -EIOCBQUEUED upon receiving the command. */ -void io_uring_cmd_done(struct io_uring_cmd *ioucmd, ssize_t ret, ssize_t res2) +void io_uring_cmd_done(struct io_uring_cmd *ioucmd, ssize_t ret, ssize_t res2, + unsigned issue_flags) { struct io_kiocb *req = cmd_to_io_kiocb(ioucmd); @@ -52,11 +54,15 @@ void io_uring_cmd_done(struct io_uring_cmd *ioucmd, ssize_t ret, ssize_t res2) io_req_set_res(req, ret, 0); if (req->ctx->flags & IORING_SETUP_CQE32) io_req_set_cqe32_extra(req, res2, 0); - if (req->ctx->flags & IORING_SETUP_IOPOLL) + if (req->ctx->flags & IORING_SETUP_IOPOLL) { /* order with io_iopoll_req_issued() checking ->iopoll_complete */ smp_store_release(&req->iopoll_completed, 1); - else - io_req_complete_post(req, 0); + } else { + struct io_tw_state ts = { + .locked = !(issue_flags & IO_URING_F_UNLOCKED), + }; + io_req_task_complete(req, &ts); + } } EXPORT_SYMBOL_GPL(io_uring_cmd_done); @@ -71,6 +77,7 @@ int io_uring_cmd_prep_async(struct io_kiocb *req) cmd_size = uring_cmd_pdu_size(req->ctx->flags & IORING_SETUP_SQE128); memcpy(req->async_data, ioucmd->cmd, cmd_size); + ioucmd->cmd = req->async_data; return 0; } @@ -108,7 +115,7 @@ int io_uring_cmd(struct io_kiocb *req, unsigned int issue_flags) struct file *file = req->file; int ret; - if (!req->file->f_op->uring_cmd) + if (!file->f_op->uring_cmd) return -EOPNOTSUPP; ret = security_uring_cmd(ioucmd); @@ -120,14 +127,13 @@ int io_uring_cmd(struct io_kiocb *req, unsigned int issue_flags) if (ctx->flags & IORING_SETUP_CQE32) issue_flags |= IO_URING_F_CQE32; if (ctx->flags & IORING_SETUP_IOPOLL) { + if (!file->f_op->uring_cmd_iopoll) + return -EOPNOTSUPP; issue_flags |= IO_URING_F_IOPOLL; req->iopoll_completed = 0; WRITE_ONCE(ioucmd->cookie, NULL); } - if (req_has_async_data(req)) - ioucmd->cmd = req->async_data; - ret = file->f_op->uring_cmd(ioucmd, issue_flags); if (ret == -EAGAIN) { if (!req_has_async_data(req)) { |