summaryrefslogtreecommitdiff
path: root/io_uring
diff options
context:
space:
mode:
Diffstat (limited to 'io_uring')
-rw-r--r--io_uring/alloc_cache.h40
-rw-r--r--io_uring/filetable.c24
-rw-r--r--io_uring/io-wq.c538
-rw-r--r--io_uring/io_uring.c356
-rw-r--r--io_uring/io_uring.h49
-rw-r--r--io_uring/kbuf.c167
-rw-r--r--io_uring/kbuf.h7
-rw-r--r--io_uring/msg_ring.c4
-rw-r--r--io_uring/net.c29
-rw-r--r--io_uring/net.h5
-rw-r--r--io_uring/notif.c8
-rw-r--r--io_uring/notif.h3
-rw-r--r--io_uring/poll.c33
-rw-r--r--io_uring/rsrc.c364
-rw-r--r--io_uring/rsrc.h82
-rw-r--r--io_uring/rw.c45
-rw-r--r--io_uring/slist.h5
-rw-r--r--io_uring/sqpoll.c1
-rw-r--r--io_uring/timeout.c71
-rw-r--r--io_uring/uring_cmd.c28
20 files changed, 989 insertions, 870 deletions
diff --git a/io_uring/alloc_cache.h b/io_uring/alloc_cache.h
index 729793ae9712..241245cb54a6 100644
--- a/io_uring/alloc_cache.h
+++ b/io_uring/alloc_cache.h
@@ -7,46 +7,60 @@
#define IO_ALLOC_CACHE_MAX 512
struct io_cache_entry {
- struct hlist_node node;
+ struct io_wq_work_node node;
};
static inline bool io_alloc_cache_put(struct io_alloc_cache *cache,
struct io_cache_entry *entry)
{
- if (cache->nr_cached < IO_ALLOC_CACHE_MAX) {
+ if (cache->nr_cached < cache->max_cached) {
cache->nr_cached++;
- hlist_add_head(&entry->node, &cache->list);
+ wq_stack_add_head(&entry->node, &cache->list);
+ /* KASAN poisons object */
+ kasan_slab_free_mempool(entry);
return true;
}
return false;
}
+static inline bool io_alloc_cache_empty(struct io_alloc_cache *cache)
+{
+ return !cache->list.next;
+}
+
static inline struct io_cache_entry *io_alloc_cache_get(struct io_alloc_cache *cache)
{
- if (!hlist_empty(&cache->list)) {
- struct hlist_node *node = cache->list.first;
+ if (cache->list.next) {
+ struct io_cache_entry *entry;
- hlist_del(node);
- return container_of(node, struct io_cache_entry, node);
+ entry = container_of(cache->list.next, struct io_cache_entry, node);
+ kasan_unpoison_range(entry, cache->elem_size);
+ cache->list.next = cache->list.next->next;
+ cache->nr_cached--;
+ return entry;
}
return NULL;
}
-static inline void io_alloc_cache_init(struct io_alloc_cache *cache)
+static inline void io_alloc_cache_init(struct io_alloc_cache *cache,
+ unsigned max_nr, size_t size)
{
- INIT_HLIST_HEAD(&cache->list);
+ cache->list.next = NULL;
cache->nr_cached = 0;
+ cache->max_cached = max_nr;
+ cache->elem_size = size;
}
static inline void io_alloc_cache_free(struct io_alloc_cache *cache,
void (*free)(struct io_cache_entry *))
{
- while (!hlist_empty(&cache->list)) {
- struct hlist_node *node = cache->list.first;
+ while (1) {
+ struct io_cache_entry *entry = io_alloc_cache_get(cache);
- hlist_del(node);
- free(container_of(node, struct io_cache_entry, node));
+ if (!entry)
+ break;
+ free(entry);
}
cache->nr_cached = 0;
}
diff --git a/io_uring/filetable.c b/io_uring/filetable.c
index 68dfc6936aa7..0f6fa791a47d 100644
--- a/io_uring/filetable.c
+++ b/io_uring/filetable.c
@@ -19,6 +19,9 @@ static int io_file_bitmap_get(struct io_ring_ctx *ctx)
unsigned long nr = ctx->file_alloc_end;
int ret;
+ if (!table->bitmap)
+ return -ENFILE;
+
do {
ret = find_next_zero_bit(table->bitmap, nr, table->alloc_hint);
if (ret != nr)
@@ -61,7 +64,6 @@ static int io_install_fixed_file(struct io_ring_ctx *ctx, struct file *file,
u32 slot_index)
__must_hold(&req->ctx->uring_lock)
{
- bool needs_switch = false;
struct io_fixed_file *file_slot;
int ret;
@@ -78,18 +80,13 @@ static int io_install_fixed_file(struct io_ring_ctx *ctx, struct file *file,
if (file_slot->file_ptr) {
struct file *old_file;
- ret = io_rsrc_node_switch_start(ctx);
- if (ret)
- goto err;
-
old_file = (struct file *)(file_slot->file_ptr & FFS_MASK);
- ret = io_queue_rsrc_removal(ctx->file_data, slot_index,
- ctx->rsrc_node, old_file);
+ ret = io_queue_rsrc_removal(ctx->file_data, slot_index, old_file);
if (ret)
- goto err;
+ return ret;
+
file_slot->file_ptr = 0;
io_file_bitmap_clear(&ctx->file_table, slot_index);
- needs_switch = true;
}
ret = io_scm_file_account(ctx, file);
@@ -98,9 +95,6 @@ static int io_install_fixed_file(struct io_ring_ctx *ctx, struct file *file,
io_fixed_file_set(file_slot, file);
io_file_bitmap_set(&ctx->file_table, slot_index);
}
-err:
- if (needs_switch)
- io_rsrc_node_switch(ctx, ctx->file_data);
return ret;
}
@@ -153,9 +147,6 @@ int io_fixed_fd_remove(struct io_ring_ctx *ctx, unsigned int offset)
return -ENXIO;
if (offset >= ctx->nr_user_files)
return -EINVAL;
- ret = io_rsrc_node_switch_start(ctx);
- if (ret)
- return ret;
offset = array_index_nospec(offset, ctx->nr_user_files);
file_slot = io_fixed_file_slot(&ctx->file_table, offset);
@@ -163,13 +154,12 @@ int io_fixed_fd_remove(struct io_ring_ctx *ctx, unsigned int offset)
return -EBADF;
file = (struct file *)(file_slot->file_ptr & FFS_MASK);
- ret = io_queue_rsrc_removal(ctx->file_data, offset, ctx->rsrc_node, file);
+ ret = io_queue_rsrc_removal(ctx->file_data, offset, file);
if (ret)
return ret;
file_slot->file_ptr = 0;
io_file_bitmap_clear(&ctx->file_table, offset);
- io_rsrc_node_switch(ctx, ctx->file_data);
return 0;
}
diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
index 411bb2d1acd4..b2715988791e 100644
--- a/io_uring/io-wq.c
+++ b/io_uring/io-wq.c
@@ -15,6 +15,7 @@
#include <linux/cpu.h>
#include <linux/task_work.h>
#include <linux/audit.h>
+#include <linux/mmu_context.h>
#include <uapi/linux/io_uring.h>
#include "io-wq.h"
@@ -39,7 +40,7 @@ enum {
};
/*
- * One for each thread in a wqe pool
+ * One for each thread in a wq pool
*/
struct io_worker {
refcount_t ref;
@@ -47,7 +48,7 @@ struct io_worker {
struct hlist_nulls_node nulls_node;
struct list_head all_list;
struct task_struct *task;
- struct io_wqe *wqe;
+ struct io_wq *wq;
struct io_wq_work *cur_work;
struct io_wq_work *next_work;
@@ -73,7 +74,7 @@ struct io_worker {
#define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER)
-struct io_wqe_acct {
+struct io_wq_acct {
unsigned nr_workers;
unsigned max_workers;
int index;
@@ -90,26 +91,6 @@ enum {
};
/*
- * Per-node worker thread pool
- */
-struct io_wqe {
- raw_spinlock_t lock;
- struct io_wqe_acct acct[IO_WQ_ACCT_NR];
-
- int node;
-
- struct hlist_nulls_head free_list;
- struct list_head all_list;
-
- struct wait_queue_entry wait;
-
- struct io_wq *wq;
- struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
-
- cpumask_var_t cpu_mask;
-};
-
-/*
* Per io_wq state
*/
struct io_wq {
@@ -127,7 +108,19 @@ struct io_wq {
struct task_struct *task;
- struct io_wqe *wqes[];
+ struct io_wq_acct acct[IO_WQ_ACCT_NR];
+
+ /* lock protects access to elements below */
+ raw_spinlock_t lock;
+
+ struct hlist_nulls_head free_list;
+ struct list_head all_list;
+
+ struct wait_queue_entry wait;
+
+ struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
+
+ cpumask_var_t cpu_mask;
};
static enum cpuhp_state io_wq_online;
@@ -140,10 +133,10 @@ struct io_cb_cancel_data {
bool cancel_all;
};
-static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
-static void io_wqe_dec_running(struct io_worker *worker);
-static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
- struct io_wqe_acct *acct,
+static bool create_io_worker(struct io_wq *wq, int index);
+static void io_wq_dec_running(struct io_worker *worker);
+static bool io_acct_cancel_pending_work(struct io_wq *wq,
+ struct io_wq_acct *acct,
struct io_cb_cancel_data *match);
static void create_worker_cb(struct callback_head *cb);
static void io_wq_cancel_tw_create(struct io_wq *wq);
@@ -159,20 +152,20 @@ static void io_worker_release(struct io_worker *worker)
complete(&worker->ref_done);
}
-static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound)
+static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound)
{
- return &wqe->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
+ return &wq->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
}
-static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
- struct io_wq_work *work)
+static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq,
+ struct io_wq_work *work)
{
- return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND));
+ return io_get_acct(wq, !(work->flags & IO_WQ_WORK_UNBOUND));
}
-static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker)
+static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker)
{
- return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND);
+ return io_get_acct(worker->wq, worker->flags & IO_WORKER_F_BOUND);
}
static void io_worker_ref_put(struct io_wq *wq)
@@ -183,14 +176,13 @@ static void io_worker_ref_put(struct io_wq *wq)
static void io_worker_cancel_cb(struct io_worker *worker)
{
- struct io_wqe_acct *acct = io_wqe_get_acct(worker);
- struct io_wqe *wqe = worker->wqe;
- struct io_wq *wq = wqe->wq;
+ struct io_wq_acct *acct = io_wq_get_acct(worker);
+ struct io_wq *wq = worker->wq;
atomic_dec(&acct->nr_running);
- raw_spin_lock(&worker->wqe->lock);
+ raw_spin_lock(&wq->lock);
acct->nr_workers--;
- raw_spin_unlock(&worker->wqe->lock);
+ raw_spin_unlock(&wq->lock);
io_worker_ref_put(wq);
clear_bit_unlock(0, &worker->create_state);
io_worker_release(worker);
@@ -208,8 +200,7 @@ static bool io_task_worker_match(struct callback_head *cb, void *data)
static void io_worker_exit(struct io_worker *worker)
{
- struct io_wqe *wqe = worker->wqe;
- struct io_wq *wq = wqe->wq;
+ struct io_wq *wq = worker->wq;
while (1) {
struct callback_head *cb = task_work_cancel_match(wq->task,
@@ -223,23 +214,23 @@ static void io_worker_exit(struct io_worker *worker)
io_worker_release(worker);
wait_for_completion(&worker->ref_done);
- raw_spin_lock(&wqe->lock);
+ raw_spin_lock(&wq->lock);
if (worker->flags & IO_WORKER_F_FREE)
hlist_nulls_del_rcu(&worker->nulls_node);
list_del_rcu(&worker->all_list);
- raw_spin_unlock(&wqe->lock);
- io_wqe_dec_running(worker);
+ raw_spin_unlock(&wq->lock);
+ io_wq_dec_running(worker);
worker->flags = 0;
preempt_disable();
current->flags &= ~PF_IO_WORKER;
preempt_enable();
kfree_rcu(worker, rcu);
- io_worker_ref_put(wqe->wq);
+ io_worker_ref_put(wq);
do_exit(0);
}
-static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
+static inline bool io_acct_run_queue(struct io_wq_acct *acct)
{
bool ret = false;
@@ -256,8 +247,8 @@ static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
* Check head of free list for an available worker. If one isn't available,
* caller must create one.
*/
-static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
- struct io_wqe_acct *acct)
+static bool io_wq_activate_free_worker(struct io_wq *wq,
+ struct io_wq_acct *acct)
__must_hold(RCU)
{
struct hlist_nulls_node *n;
@@ -268,10 +259,10 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
* activate. If a given worker is on the free_list but in the process
* of exiting, keep trying.
*/
- hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
+ hlist_nulls_for_each_entry_rcu(worker, n, &wq->free_list, nulls_node) {
if (!io_worker_get(worker))
continue;
- if (io_wqe_get_acct(worker) != acct) {
+ if (io_wq_get_acct(worker) != acct) {
io_worker_release(worker);
continue;
}
@@ -289,7 +280,7 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
* We need a worker. If we find a free one, we're good. If not, and we're
* below the max number of workers, create one.
*/
-static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
+static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct)
{
/*
* Most likely an attempt to queue unbounded work on an io_wq that
@@ -298,21 +289,21 @@ static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
if (unlikely(!acct->max_workers))
pr_warn_once("io-wq is not configured for unbound workers");
- raw_spin_lock(&wqe->lock);
+ raw_spin_lock(&wq->lock);
if (acct->nr_workers >= acct->max_workers) {
- raw_spin_unlock(&wqe->lock);
+ raw_spin_unlock(&wq->lock);
return true;
}
acct->nr_workers++;
- raw_spin_unlock(&wqe->lock);
+ raw_spin_unlock(&wq->lock);
atomic_inc(&acct->nr_running);
- atomic_inc(&wqe->wq->worker_refs);
- return create_io_worker(wqe->wq, wqe, acct->index);
+ atomic_inc(&wq->worker_refs);
+ return create_io_worker(wq, acct->index);
}
-static void io_wqe_inc_running(struct io_worker *worker)
+static void io_wq_inc_running(struct io_worker *worker)
{
- struct io_wqe_acct *acct = io_wqe_get_acct(worker);
+ struct io_wq_acct *acct = io_wq_get_acct(worker);
atomic_inc(&acct->nr_running);
}
@@ -321,22 +312,22 @@ static void create_worker_cb(struct callback_head *cb)
{
struct io_worker *worker;
struct io_wq *wq;
- struct io_wqe *wqe;
- struct io_wqe_acct *acct;
+
+ struct io_wq_acct *acct;
bool do_create = false;
worker = container_of(cb, struct io_worker, create_work);
- wqe = worker->wqe;
- wq = wqe->wq;
- acct = &wqe->acct[worker->create_index];
- raw_spin_lock(&wqe->lock);
+ wq = worker->wq;
+ acct = &wq->acct[worker->create_index];
+ raw_spin_lock(&wq->lock);
+
if (acct->nr_workers < acct->max_workers) {
acct->nr_workers++;
do_create = true;
}
- raw_spin_unlock(&wqe->lock);
+ raw_spin_unlock(&wq->lock);
if (do_create) {
- create_io_worker(wq, wqe, worker->create_index);
+ create_io_worker(wq, worker->create_index);
} else {
atomic_dec(&acct->nr_running);
io_worker_ref_put(wq);
@@ -346,11 +337,10 @@ static void create_worker_cb(struct callback_head *cb)
}
static bool io_queue_worker_create(struct io_worker *worker,
- struct io_wqe_acct *acct,
+ struct io_wq_acct *acct,
task_work_func_t func)
{
- struct io_wqe *wqe = worker->wqe;
- struct io_wq *wq = wqe->wq;
+ struct io_wq *wq = worker->wq;
/* raced with exit, just ignore create call */
if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
@@ -392,10 +382,10 @@ fail:
return false;
}
-static void io_wqe_dec_running(struct io_worker *worker)
+static void io_wq_dec_running(struct io_worker *worker)
{
- struct io_wqe_acct *acct = io_wqe_get_acct(worker);
- struct io_wqe *wqe = worker->wqe;
+ struct io_wq_acct *acct = io_wq_get_acct(worker);
+ struct io_wq *wq = worker->wq;
if (!(worker->flags & IO_WORKER_F_UP))
return;
@@ -406,7 +396,7 @@ static void io_wqe_dec_running(struct io_worker *worker)
return;
atomic_inc(&acct->nr_running);
- atomic_inc(&wqe->wq->worker_refs);
+ atomic_inc(&wq->worker_refs);
io_queue_worker_create(worker, acct, create_worker_cb);
}
@@ -414,29 +404,25 @@ static void io_wqe_dec_running(struct io_worker *worker)
* Worker will start processing some work. Move it to the busy list, if
* it's currently on the freelist
*/
-static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker)
+static void __io_worker_busy(struct io_wq *wq, struct io_worker *worker)
{
if (worker->flags & IO_WORKER_F_FREE) {
worker->flags &= ~IO_WORKER_F_FREE;
- raw_spin_lock(&wqe->lock);
+ raw_spin_lock(&wq->lock);
hlist_nulls_del_init_rcu(&worker->nulls_node);
- raw_spin_unlock(&wqe->lock);
+ raw_spin_unlock(&wq->lock);
}
}
/*
- * No work, worker going to sleep. Move to freelist, and unuse mm if we
- * have one attached. Dropping the mm may potentially sleep, so we drop
- * the lock in that case and return success. Since the caller has to
- * retry the loop in that case (we changed task state), we don't regrab
- * the lock if we return success.
+ * No work, worker going to sleep. Move to freelist.
*/
-static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
- __must_hold(wqe->lock)
+static void __io_worker_idle(struct io_wq *wq, struct io_worker *worker)
+ __must_hold(wq->lock)
{
if (!(worker->flags & IO_WORKER_F_FREE)) {
worker->flags |= IO_WORKER_F_FREE;
- hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
+ hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list);
}
}
@@ -445,17 +431,16 @@ static inline unsigned int io_get_work_hash(struct io_wq_work *work)
return work->flags >> IO_WQ_HASH_SHIFT;
}
-static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
+static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash)
{
- struct io_wq *wq = wqe->wq;
bool ret = false;
spin_lock_irq(&wq->hash->wait.lock);
- if (list_empty(&wqe->wait.entry)) {
- __add_wait_queue(&wq->hash->wait, &wqe->wait);
+ if (list_empty(&wq->wait.entry)) {
+ __add_wait_queue(&wq->hash->wait, &wq->wait);
if (!test_bit(hash, &wq->hash->map)) {
__set_current_state(TASK_RUNNING);
- list_del_init(&wqe->wait.entry);
+ list_del_init(&wq->wait.entry);
ret = true;
}
}
@@ -463,14 +448,14 @@ static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
return ret;
}
-static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
+static struct io_wq_work *io_get_next_work(struct io_wq_acct *acct,
struct io_worker *worker)
__must_hold(acct->lock)
{
struct io_wq_work_node *node, *prev;
struct io_wq_work *work, *tail;
unsigned int stall_hash = -1U;
- struct io_wqe *wqe = worker->wqe;
+ struct io_wq *wq = worker->wq;
wq_list_for_each(node, prev, &acct->work_list) {
unsigned int hash;
@@ -485,11 +470,11 @@ static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
hash = io_get_work_hash(work);
/* all items with this hash lie in [work, tail] */
- tail = wqe->hash_tail[hash];
+ tail = wq->hash_tail[hash];
/* hashed, can run if not already running */
- if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
- wqe->hash_tail[hash] = NULL;
+ if (!test_and_set_bit(hash, &wq->hash->map)) {
+ wq->hash_tail[hash] = NULL;
wq_list_cut(&acct->work_list, &tail->list, prev);
return work;
}
@@ -508,12 +493,12 @@ static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
*/
set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
raw_spin_unlock(&acct->lock);
- unstalled = io_wait_on_hash(wqe, stall_hash);
+ unstalled = io_wait_on_hash(wq, stall_hash);
raw_spin_lock(&acct->lock);
if (unstalled) {
clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
- if (wq_has_sleeper(&wqe->wq->hash->wait))
- wake_up(&wqe->wq->hash->wait);
+ if (wq_has_sleeper(&wq->hash->wait))
+ wake_up(&wq->hash->wait);
}
}
@@ -534,13 +519,10 @@ static void io_assign_current_work(struct io_worker *worker,
raw_spin_unlock(&worker->lock);
}
-static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
-
static void io_worker_handle_work(struct io_worker *worker)
{
- struct io_wqe_acct *acct = io_wqe_get_acct(worker);
- struct io_wqe *wqe = worker->wqe;
- struct io_wq *wq = wqe->wq;
+ struct io_wq_acct *acct = io_wq_get_acct(worker);
+ struct io_wq *wq = worker->wq;
bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
do {
@@ -557,7 +539,7 @@ static void io_worker_handle_work(struct io_worker *worker)
work = io_get_next_work(acct, worker);
raw_spin_unlock(&acct->lock);
if (work) {
- __io_worker_busy(wqe, worker);
+ __io_worker_busy(wq, worker);
/*
* Make sure cancelation can find this, even before
@@ -595,7 +577,7 @@ static void io_worker_handle_work(struct io_worker *worker)
}
io_assign_current_work(worker, work);
if (linked)
- io_wqe_enqueue(wqe, linked);
+ io_wq_enqueue(wq, linked);
if (hash != -1U && !next_hashed) {
/* serialize hash clear with wake_up() */
@@ -610,13 +592,12 @@ static void io_worker_handle_work(struct io_worker *worker)
} while (1);
}
-static int io_wqe_worker(void *data)
+static int io_wq_worker(void *data)
{
struct io_worker *worker = data;
- struct io_wqe_acct *acct = io_wqe_get_acct(worker);
- struct io_wqe *wqe = worker->wqe;
- struct io_wq *wq = wqe->wq;
- bool last_timeout = false;
+ struct io_wq_acct *acct = io_wq_get_acct(worker);
+ struct io_wq *wq = worker->wq;
+ bool exit_mask = false, last_timeout = false;
char buf[TASK_COMM_LEN];
worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
@@ -631,17 +612,20 @@ static int io_wqe_worker(void *data)
while (io_acct_run_queue(acct))
io_worker_handle_work(worker);
- raw_spin_lock(&wqe->lock);
- /* timed out, exit unless we're the last worker */
- if (last_timeout && acct->nr_workers > 1) {
+ raw_spin_lock(&wq->lock);
+ /*
+ * Last sleep timed out. Exit if we're not the last worker,
+ * or if someone modified our affinity.
+ */
+ if (last_timeout && (exit_mask || acct->nr_workers > 1)) {
acct->nr_workers--;
- raw_spin_unlock(&wqe->lock);
+ raw_spin_unlock(&wq->lock);
__set_current_state(TASK_RUNNING);
break;
}
last_timeout = false;
- __io_worker_idle(wqe, worker);
- raw_spin_unlock(&wqe->lock);
+ __io_worker_idle(wq, worker);
+ raw_spin_unlock(&wq->lock);
if (io_run_task_work())
continue;
ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
@@ -652,7 +636,11 @@ static int io_wqe_worker(void *data)
continue;
break;
}
- last_timeout = !ret;
+ if (!ret) {
+ last_timeout = true;
+ exit_mask = !cpumask_test_cpu(raw_smp_processor_id(),
+ wq->cpu_mask);
+ }
}
if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
@@ -676,7 +664,7 @@ void io_wq_worker_running(struct task_struct *tsk)
if (worker->flags & IO_WORKER_F_RUNNING)
return;
worker->flags |= IO_WORKER_F_RUNNING;
- io_wqe_inc_running(worker);
+ io_wq_inc_running(worker);
}
/*
@@ -695,22 +683,21 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
return;
worker->flags &= ~IO_WORKER_F_RUNNING;
- io_wqe_dec_running(worker);
+ io_wq_dec_running(worker);
}
-static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker,
+static void io_init_new_worker(struct io_wq *wq, struct io_worker *worker,
struct task_struct *tsk)
{
tsk->worker_private = worker;
worker->task = tsk;
- set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
- tsk->flags |= PF_NO_SETAFFINITY;
+ set_cpus_allowed_ptr(tsk, wq->cpu_mask);
- raw_spin_lock(&wqe->lock);
- hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
- list_add_tail_rcu(&worker->all_list, &wqe->all_list);
+ raw_spin_lock(&wq->lock);
+ hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list);
+ list_add_tail_rcu(&worker->all_list, &wq->all_list);
worker->flags |= IO_WORKER_F_FREE;
- raw_spin_unlock(&wqe->lock);
+ raw_spin_unlock(&wq->lock);
wake_up_new_task(tsk);
}
@@ -743,21 +730,21 @@ static void create_worker_cont(struct callback_head *cb)
{
struct io_worker *worker;
struct task_struct *tsk;
- struct io_wqe *wqe;
+ struct io_wq *wq;
worker = container_of(cb, struct io_worker, create_work);
clear_bit_unlock(0, &worker->create_state);
- wqe = worker->wqe;
- tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
+ wq = worker->wq;
+ tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE);
if (!IS_ERR(tsk)) {
- io_init_new_worker(wqe, worker, tsk);
+ io_init_new_worker(wq, worker, tsk);
io_worker_release(worker);
return;
} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
- struct io_wqe_acct *acct = io_wqe_get_acct(worker);
+ struct io_wq_acct *acct = io_wq_get_acct(worker);
atomic_dec(&acct->nr_running);
- raw_spin_lock(&wqe->lock);
+ raw_spin_lock(&wq->lock);
acct->nr_workers--;
if (!acct->nr_workers) {
struct io_cb_cancel_data match = {
@@ -765,13 +752,13 @@ static void create_worker_cont(struct callback_head *cb)
.cancel_all = true,
};
- raw_spin_unlock(&wqe->lock);
- while (io_acct_cancel_pending_work(wqe, acct, &match))
+ raw_spin_unlock(&wq->lock);
+ while (io_acct_cancel_pending_work(wq, acct, &match))
;
} else {
- raw_spin_unlock(&wqe->lock);
+ raw_spin_unlock(&wq->lock);
}
- io_worker_ref_put(wqe->wq);
+ io_worker_ref_put(wq);
kfree(worker);
return;
}
@@ -784,42 +771,42 @@ static void create_worker_cont(struct callback_head *cb)
static void io_workqueue_create(struct work_struct *work)
{
struct io_worker *worker = container_of(work, struct io_worker, work);
- struct io_wqe_acct *acct = io_wqe_get_acct(worker);
+ struct io_wq_acct *acct = io_wq_get_acct(worker);
if (!io_queue_worker_create(worker, acct, create_worker_cont))
kfree(worker);
}
-static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
+static bool create_io_worker(struct io_wq *wq, int index)
{
- struct io_wqe_acct *acct = &wqe->acct[index];
+ struct io_wq_acct *acct = &wq->acct[index];
struct io_worker *worker;
struct task_struct *tsk;
__set_current_state(TASK_RUNNING);
- worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
+ worker = kzalloc(sizeof(*worker), GFP_KERNEL);
if (!worker) {
fail:
atomic_dec(&acct->nr_running);
- raw_spin_lock(&wqe->lock);
+ raw_spin_lock(&wq->lock);
acct->nr_workers--;
- raw_spin_unlock(&wqe->lock);
+ raw_spin_unlock(&wq->lock);
io_worker_ref_put(wq);
return false;
}
refcount_set(&worker->ref, 1);
- worker->wqe = wqe;
+ worker->wq = wq;
raw_spin_lock_init(&worker->lock);
init_completion(&worker->ref_done);
if (index == IO_WQ_ACCT_BOUND)
worker->flags |= IO_WORKER_F_BOUND;
- tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
+ tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE);
if (!IS_ERR(tsk)) {
- io_init_new_worker(wqe, worker, tsk);
+ io_init_new_worker(wq, worker, tsk);
} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
kfree(worker);
goto fail;
@@ -835,14 +822,14 @@ fail:
* Iterate the passed in list and call the specific function for each
* worker that isn't exiting
*/
-static bool io_wq_for_each_worker(struct io_wqe *wqe,
+static bool io_wq_for_each_worker(struct io_wq *wq,
bool (*func)(struct io_worker *, void *),
void *data)
{
struct io_worker *worker;
bool ret = false;
- list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
+ list_for_each_entry_rcu(worker, &wq->all_list, all_list) {
if (io_worker_get(worker)) {
/* no task if node is/was offline */
if (worker->task)
@@ -863,10 +850,8 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data)
return false;
}
-static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
+static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq)
{
- struct io_wq *wq = wqe->wq;
-
do {
work->flags |= IO_WQ_WORK_CANCEL;
wq->do_work(work);
@@ -874,9 +859,9 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
} while (work);
}
-static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
+static void io_wq_insert_work(struct io_wq *wq, struct io_wq_work *work)
{
- struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
+ struct io_wq_acct *acct = io_work_get_acct(wq, work);
unsigned int hash;
struct io_wq_work *tail;
@@ -887,8 +872,8 @@ append:
}
hash = io_get_work_hash(work);
- tail = wqe->hash_tail[hash];
- wqe->hash_tail[hash] = work;
+ tail = wq->hash_tail[hash];
+ wq->hash_tail[hash] = work;
if (!tail)
goto append;
@@ -900,9 +885,9 @@ static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
return work == data;
}
-static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
+void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
{
- struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
+ struct io_wq_acct *acct = io_work_get_acct(wq, work);
struct io_cb_cancel_data match;
unsigned work_flags = work->flags;
bool do_create;
@@ -911,55 +896,48 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
* If io-wq is exiting for this task, or if the request has explicitly
* been marked as one that should not get executed, cancel it here.
*/
- if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state) ||
+ if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
(work->flags & IO_WQ_WORK_CANCEL)) {
- io_run_cancel(work, wqe);
+ io_run_cancel(work, wq);
return;
}
raw_spin_lock(&acct->lock);
- io_wqe_insert_work(wqe, work);
+ io_wq_insert_work(wq, work);
clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
raw_spin_unlock(&acct->lock);
- raw_spin_lock(&wqe->lock);
+ raw_spin_lock(&wq->lock);
rcu_read_lock();
- do_create = !io_wqe_activate_free_worker(wqe, acct);
+ do_create = !io_wq_activate_free_worker(wq, acct);
rcu_read_unlock();
- raw_spin_unlock(&wqe->lock);
+ raw_spin_unlock(&wq->lock);
if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
!atomic_read(&acct->nr_running))) {
bool did_create;
- did_create = io_wqe_create_worker(wqe, acct);
+ did_create = io_wq_create_worker(wq, acct);
if (likely(did_create))
return;
- raw_spin_lock(&wqe->lock);
+ raw_spin_lock(&wq->lock);
if (acct->nr_workers) {
- raw_spin_unlock(&wqe->lock);
+ raw_spin_unlock(&wq->lock);
return;
}
- raw_spin_unlock(&wqe->lock);
+ raw_spin_unlock(&wq->lock);
/* fatal condition, failed to create the first worker */
match.fn = io_wq_work_match_item,
match.data = work,
match.cancel_all = false,
- io_acct_cancel_pending_work(wqe, acct, &match);
+ io_acct_cancel_pending_work(wq, acct, &match);
}
}
-void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
-{
- struct io_wqe *wqe = wq->wqes[numa_node_id()];
-
- io_wqe_enqueue(wqe, work);
-}
-
/*
* Work items that hash to the same value will not be done in parallel.
* Used to limit concurrent writes, generally hashed by inode.
@@ -1002,27 +980,27 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
return match->nr_running && !match->cancel_all;
}
-static inline void io_wqe_remove_pending(struct io_wqe *wqe,
+static inline void io_wq_remove_pending(struct io_wq *wq,
struct io_wq_work *work,
struct io_wq_work_node *prev)
{
- struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
+ struct io_wq_acct *acct = io_work_get_acct(wq, work);
unsigned int hash = io_get_work_hash(work);
struct io_wq_work *prev_work = NULL;
- if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) {
+ if (io_wq_is_hashed(work) && work == wq->hash_tail[hash]) {
if (prev)
prev_work = container_of(prev, struct io_wq_work, list);
if (prev_work && io_get_work_hash(prev_work) == hash)
- wqe->hash_tail[hash] = prev_work;
+ wq->hash_tail[hash] = prev_work;
else
- wqe->hash_tail[hash] = NULL;
+ wq->hash_tail[hash] = NULL;
}
wq_list_del(&acct->work_list, &work->list, prev);
}
-static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
- struct io_wqe_acct *acct,
+static bool io_acct_cancel_pending_work(struct io_wq *wq,
+ struct io_wq_acct *acct,
struct io_cb_cancel_data *match)
{
struct io_wq_work_node *node, *prev;
@@ -1033,9 +1011,9 @@ static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
work = container_of(node, struct io_wq_work, list);
if (!match->fn(work, match->data))
continue;
- io_wqe_remove_pending(wqe, work, prev);
+ io_wq_remove_pending(wq, work, prev);
raw_spin_unlock(&acct->lock);
- io_run_cancel(work, wqe);
+ io_run_cancel(work, wq);
match->nr_pending++;
/* not safe to continue after unlock */
return true;
@@ -1045,15 +1023,15 @@ static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
return false;
}
-static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
- struct io_cb_cancel_data *match)
+static void io_wq_cancel_pending_work(struct io_wq *wq,
+ struct io_cb_cancel_data *match)
{
int i;
retry:
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
- struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
+ struct io_wq_acct *acct = io_get_acct(wq, i == 0);
- if (io_acct_cancel_pending_work(wqe, acct, match)) {
+ if (io_acct_cancel_pending_work(wq, acct, match)) {
if (match->cancel_all)
goto retry;
break;
@@ -1061,11 +1039,11 @@ retry:
}
}
-static void io_wqe_cancel_running_work(struct io_wqe *wqe,
+static void io_wq_cancel_running_work(struct io_wq *wq,
struct io_cb_cancel_data *match)
{
rcu_read_lock();
- io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
+ io_wq_for_each_worker(wq, io_wq_worker_cancel, match);
rcu_read_unlock();
}
@@ -1077,7 +1055,6 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
.data = data,
.cancel_all = cancel_all,
};
- int node;
/*
* First check pending list, if we're lucky we can just remove it
@@ -1089,22 +1066,18 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
* as an indication that we attempt to signal cancellation. The
* completion will run normally in this case.
*
- * Do both of these while holding the wqe->lock, to ensure that
+ * Do both of these while holding the wq->lock, to ensure that
* we'll find a work item regardless of state.
*/
- for_each_node(node) {
- struct io_wqe *wqe = wq->wqes[node];
-
- io_wqe_cancel_pending_work(wqe, &match);
- if (match.nr_pending && !match.cancel_all)
- return IO_WQ_CANCEL_OK;
-
- raw_spin_lock(&wqe->lock);
- io_wqe_cancel_running_work(wqe, &match);
- raw_spin_unlock(&wqe->lock);
- if (match.nr_running && !match.cancel_all)
- return IO_WQ_CANCEL_RUNNING;
- }
+ io_wq_cancel_pending_work(wq, &match);
+ if (match.nr_pending && !match.cancel_all)
+ return IO_WQ_CANCEL_OK;
+
+ raw_spin_lock(&wq->lock);
+ io_wq_cancel_running_work(wq, &match);
+ raw_spin_unlock(&wq->lock);
+ if (match.nr_running && !match.cancel_all)
+ return IO_WQ_CANCEL_RUNNING;
if (match.nr_running)
return IO_WQ_CANCEL_RUNNING;
@@ -1113,20 +1086,20 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
return IO_WQ_CANCEL_NOTFOUND;
}
-static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
+static int io_wq_hash_wake(struct wait_queue_entry *wait, unsigned mode,
int sync, void *key)
{
- struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
+ struct io_wq *wq = container_of(wait, struct io_wq, wait);
int i;
list_del_init(&wait->entry);
rcu_read_lock();
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
- struct io_wqe_acct *acct = &wqe->acct[i];
+ struct io_wq_acct *acct = &wq->acct[i];
if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
- io_wqe_activate_free_worker(wqe, acct);
+ io_wq_activate_free_worker(wq, acct);
}
rcu_read_unlock();
return 1;
@@ -1134,7 +1107,7 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
{
- int ret, node, i;
+ int ret, i;
struct io_wq *wq;
if (WARN_ON_ONCE(!data->free_work || !data->do_work))
@@ -1142,7 +1115,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
if (WARN_ON_ONCE(!bounded))
return ERR_PTR(-EINVAL);
- wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL);
+ wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL);
if (!wq)
return ERR_PTR(-ENOMEM);
ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
@@ -1155,39 +1128,28 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
wq->do_work = data->do_work;
ret = -ENOMEM;
- for_each_node(node) {
- struct io_wqe *wqe;
- int alloc_node = node;
-
- if (!node_online(alloc_node))
- alloc_node = NUMA_NO_NODE;
- wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
- if (!wqe)
- goto err;
- wq->wqes[node] = wqe;
- if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
- goto err;
- cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
- wqe->node = alloc_node;
- wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
- wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
- task_rlimit(current, RLIMIT_NPROC);
- INIT_LIST_HEAD(&wqe->wait.entry);
- wqe->wait.func = io_wqe_hash_wake;
- for (i = 0; i < IO_WQ_ACCT_NR; i++) {
- struct io_wqe_acct *acct = &wqe->acct[i];
-
- acct->index = i;
- atomic_set(&acct->nr_running, 0);
- INIT_WQ_LIST(&acct->work_list);
- raw_spin_lock_init(&acct->lock);
- }
- wqe->wq = wq;
- raw_spin_lock_init(&wqe->lock);
- INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
- INIT_LIST_HEAD(&wqe->all_list);
+
+ if (!alloc_cpumask_var(&wq->cpu_mask, GFP_KERNEL))
+ goto err;
+ cpumask_copy(wq->cpu_mask, cpu_possible_mask);
+ wq->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
+ wq->acct[IO_WQ_ACCT_UNBOUND].max_workers =
+ task_rlimit(current, RLIMIT_NPROC);
+ INIT_LIST_HEAD(&wq->wait.entry);
+ wq->wait.func = io_wq_hash_wake;
+ for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+ struct io_wq_acct *acct = &wq->acct[i];
+
+ acct->index = i;
+ atomic_set(&acct->nr_running, 0);
+ INIT_WQ_LIST(&acct->work_list);
+ raw_spin_lock_init(&acct->lock);
}
+ raw_spin_lock_init(&wq->lock);
+ INIT_HLIST_NULLS_HEAD(&wq->free_list, 0);
+ INIT_LIST_HEAD(&wq->all_list);
+
wq->task = get_task_struct(data->task);
atomic_set(&wq->worker_refs, 1);
init_completion(&wq->worker_done);
@@ -1195,12 +1157,8 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
err:
io_wq_put_hash(data->hash);
cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
- for_each_node(node) {
- if (!wq->wqes[node])
- continue;
- free_cpumask_var(wq->wqes[node]->cpu_mask);
- kfree(wq->wqes[node]);
- }
+
+ free_cpumask_var(wq->cpu_mask);
err_wq:
kfree(wq);
return ERR_PTR(ret);
@@ -1213,7 +1171,7 @@ static bool io_task_work_match(struct callback_head *cb, void *data)
if (cb->func != create_worker_cb && cb->func != create_worker_cont)
return false;
worker = container_of(cb, struct io_worker, create_work);
- return worker->wqe->wq == data;
+ return worker->wq == data;
}
void io_wq_exit_start(struct io_wq *wq)
@@ -1241,48 +1199,35 @@ static void io_wq_cancel_tw_create(struct io_wq *wq)
static void io_wq_exit_workers(struct io_wq *wq)
{
- int node;
-
if (!wq->task)
return;
io_wq_cancel_tw_create(wq);
rcu_read_lock();
- for_each_node(node) {
- struct io_wqe *wqe = wq->wqes[node];
-
- io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
- }
+ io_wq_for_each_worker(wq, io_wq_worker_wake, NULL);
rcu_read_unlock();
io_worker_ref_put(wq);
wait_for_completion(&wq->worker_done);
- for_each_node(node) {
- spin_lock_irq(&wq->hash->wait.lock);
- list_del_init(&wq->wqes[node]->wait.entry);
- spin_unlock_irq(&wq->hash->wait.lock);
- }
+ spin_lock_irq(&wq->hash->wait.lock);
+ list_del_init(&wq->wait.entry);
+ spin_unlock_irq(&wq->hash->wait.lock);
+
put_task_struct(wq->task);
wq->task = NULL;
}
static void io_wq_destroy(struct io_wq *wq)
{
- int node;
+ struct io_cb_cancel_data match = {
+ .fn = io_wq_work_match_all,
+ .cancel_all = true,
+ };
cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
-
- for_each_node(node) {
- struct io_wqe *wqe = wq->wqes[node];
- struct io_cb_cancel_data match = {
- .fn = io_wq_work_match_all,
- .cancel_all = true,
- };
- io_wqe_cancel_pending_work(wqe, &match);
- free_cpumask_var(wqe->cpu_mask);
- kfree(wqe);
- }
+ io_wq_cancel_pending_work(wq, &match);
+ free_cpumask_var(wq->cpu_mask);
io_wq_put_hash(wq->hash);
kfree(wq);
}
@@ -1305,9 +1250,9 @@ static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
struct online_data *od = data;
if (od->online)
- cpumask_set_cpu(od->cpu, worker->wqe->cpu_mask);
+ cpumask_set_cpu(od->cpu, worker->wq->cpu_mask);
else
- cpumask_clear_cpu(od->cpu, worker->wqe->cpu_mask);
+ cpumask_clear_cpu(od->cpu, worker->wq->cpu_mask);
return false;
}
@@ -1317,11 +1262,9 @@ static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
.cpu = cpu,
.online = online
};
- int i;
rcu_read_lock();
- for_each_node(i)
- io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od);
+ io_wq_for_each_worker(wq, io_wq_worker_affinity, &od);
rcu_read_unlock();
return 0;
}
@@ -1342,18 +1285,13 @@ static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
{
- int i;
-
rcu_read_lock();
- for_each_node(i) {
- struct io_wqe *wqe = wq->wqes[i];
-
- if (mask)
- cpumask_copy(wqe->cpu_mask, mask);
- else
- cpumask_copy(wqe->cpu_mask, cpumask_of_node(i));
- }
+ if (mask)
+ cpumask_copy(wq->cpu_mask, mask);
+ else
+ cpumask_copy(wq->cpu_mask, cpu_possible_mask);
rcu_read_unlock();
+
return 0;
}
@@ -1363,9 +1301,9 @@ int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
*/
int io_wq_max_workers(struct io_wq *wq, int *new_count)
{
+ struct io_wq_acct *acct;
int prev[IO_WQ_ACCT_NR];
- bool first_node = true;
- int i, node;
+ int i;
BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND);
BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
@@ -1380,21 +1318,15 @@ int io_wq_max_workers(struct io_wq *wq, int *new_count)
prev[i] = 0;
rcu_read_lock();
- for_each_node(node) {
- struct io_wqe *wqe = wq->wqes[node];
- struct io_wqe_acct *acct;
-
- raw_spin_lock(&wqe->lock);
- for (i = 0; i < IO_WQ_ACCT_NR; i++) {
- acct = &wqe->acct[i];
- if (first_node)
- prev[i] = max_t(int, acct->max_workers, prev[i]);
- if (new_count[i])
- acct->max_workers = new_count[i];
- }
- raw_spin_unlock(&wqe->lock);
- first_node = false;
+
+ raw_spin_lock(&wq->lock);
+ for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+ acct = &wq->acct[i];
+ prev[i] = max_t(int, acct->max_workers, prev[i]);
+ if (new_count[i])
+ acct->max_workers = new_count[i];
}
+ raw_spin_unlock(&wq->lock);
rcu_read_unlock();
for (i = 0; i < IO_WQ_ACCT_NR; i++)
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index fd1cc35a1c00..3bca7a79efda 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -72,6 +72,7 @@
#include <linux/io_uring.h>
#include <linux/audit.h>
#include <linux/security.h>
+#include <asm/shmparam.h>
#define CREATE_TRACE_POINTS
#include <trace/events/io_uring.h>
@@ -246,12 +247,12 @@ static __cold void io_fallback_req_func(struct work_struct *work)
fallback_work.work);
struct llist_node *node = llist_del_all(&ctx->fallback_llist);
struct io_kiocb *req, *tmp;
- bool locked = true;
+ struct io_tw_state ts = { .locked = true, };
mutex_lock(&ctx->uring_lock);
llist_for_each_entry_safe(req, tmp, node, io_task_work.node)
- req->io_task_work.func(req, &locked);
- if (WARN_ON_ONCE(!locked))
+ req->io_task_work.func(req, &ts);
+ if (WARN_ON_ONCE(!ts.locked))
return;
io_submit_flush_completions(ctx);
mutex_unlock(&ctx->uring_lock);
@@ -309,13 +310,18 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
INIT_LIST_HEAD(&ctx->sqd_list);
INIT_LIST_HEAD(&ctx->cq_overflow_list);
INIT_LIST_HEAD(&ctx->io_buffers_cache);
- io_alloc_cache_init(&ctx->apoll_cache);
- io_alloc_cache_init(&ctx->netmsg_cache);
+ io_alloc_cache_init(&ctx->rsrc_node_cache, IO_NODE_ALLOC_CACHE_MAX,
+ sizeof(struct io_rsrc_node));
+ io_alloc_cache_init(&ctx->apoll_cache, IO_ALLOC_CACHE_MAX,
+ sizeof(struct async_poll));
+ io_alloc_cache_init(&ctx->netmsg_cache, IO_ALLOC_CACHE_MAX,
+ sizeof(struct io_async_msghdr));
init_completion(&ctx->ref_comp);
xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1);
mutex_init(&ctx->uring_lock);
init_waitqueue_head(&ctx->cq_wait);
init_waitqueue_head(&ctx->poll_wq);
+ init_waitqueue_head(&ctx->rsrc_quiesce_wq);
spin_lock_init(&ctx->completion_lock);
spin_lock_init(&ctx->timeout_lock);
INIT_WQ_LIST(&ctx->iopoll_list);
@@ -324,11 +330,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
INIT_LIST_HEAD(&ctx->defer_list);
INIT_LIST_HEAD(&ctx->timeout_list);
INIT_LIST_HEAD(&ctx->ltimeout_list);
- spin_lock_init(&ctx->rsrc_ref_lock);
INIT_LIST_HEAD(&ctx->rsrc_ref_list);
- INIT_DELAYED_WORK(&ctx->rsrc_put_work, io_rsrc_put_work);
- init_task_work(&ctx->rsrc_put_tw, io_rsrc_put_tw);
- init_llist_head(&ctx->rsrc_put_llist);
init_llist_head(&ctx->work_llist);
INIT_LIST_HEAD(&ctx->tctx_list);
ctx->submit_state.free_list.next = NULL;
@@ -424,8 +426,14 @@ static void io_prep_async_work(struct io_kiocb *req)
if (req->file && !io_req_ffs_set(req))
req->flags |= io_file_get_flags(req->file) << REQ_F_SUPPORT_NOWAIT_BIT;
- if (req->flags & REQ_F_ISREG) {
- if (def->hash_reg_file || (ctx->flags & IORING_SETUP_IOPOLL))
+ if (req->file && (req->flags & REQ_F_ISREG)) {
+ bool should_hash = def->hash_reg_file;
+
+ /* don't serialize this request if the fs doesn't need it */
+ if (should_hash && (req->file->f_flags & O_DIRECT) &&
+ (req->file->f_mode & FMODE_DIO_PARALLEL_WRITE))
+ should_hash = false;
+ if (should_hash || (ctx->flags & IORING_SETUP_IOPOLL))
io_wq_hash_work(&req->work, file_inode(req->file));
} else if (!req->file || !S_ISBLK(file_inode(req->file)->i_mode)) {
if (def->unbound_nonreg_file)
@@ -450,7 +458,7 @@ static void io_prep_async_link(struct io_kiocb *req)
}
}
-void io_queue_iowq(struct io_kiocb *req, bool *dont_use)
+void io_queue_iowq(struct io_kiocb *req, struct io_tw_state *ts_dont_use)
{
struct io_kiocb *link = io_prep_linked_timeout(req);
struct io_uring_task *tctx = req->task->io_uring;
@@ -620,22 +628,22 @@ static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx)
io_cqring_wake(ctx);
}
-static inline void __io_cq_unlock_post_flush(struct io_ring_ctx *ctx)
+static void __io_cq_unlock_post_flush(struct io_ring_ctx *ctx)
__releases(ctx->completion_lock)
{
io_commit_cqring(ctx);
- __io_cq_unlock(ctx);
- io_commit_cqring_flush(ctx);
- /*
- * As ->task_complete implies that the ring is single tasked, cq_wait
- * may only be waited on by the current in io_cqring_wait(), but since
- * it will re-check the wakeup conditions once we return we can safely
- * skip waking it up.
- */
- if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN)) {
- smp_mb();
- __io_cqring_wake(ctx);
+ if (ctx->task_complete) {
+ /*
+ * ->task_complete implies that only current might be waiting
+ * for CQEs, and obviously, we currently don't. No one is
+ * waiting, wakeups are futile, skip them.
+ */
+ io_commit_cqring_flush(ctx);
+ } else {
+ __io_cq_unlock(ctx);
+ io_commit_cqring_flush(ctx);
+ io_cqring_wake(ctx);
}
}
@@ -960,9 +968,10 @@ bool io_aux_cqe(struct io_ring_ctx *ctx, bool defer, u64 user_data, s32 res, u32
return true;
}
-static void __io_req_complete_post(struct io_kiocb *req)
+static void __io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
{
struct io_ring_ctx *ctx = req->ctx;
+ struct io_rsrc_node *rsrc_node = NULL;
io_cq_lock(ctx);
if (!(req->flags & REQ_F_CQE_SKIP))
@@ -983,7 +992,7 @@ static void __io_req_complete_post(struct io_kiocb *req)
}
io_put_kbuf_comp(req);
io_dismantle_req(req);
- io_req_put_rsrc(req);
+ rsrc_node = req->rsrc_node;
/*
* Selected buffer deallocation in io_clean_op() assumes that
* we don't hold ->completion_lock. Clean them here to avoid
@@ -994,21 +1003,27 @@ static void __io_req_complete_post(struct io_kiocb *req)
ctx->locked_free_nr++;
}
io_cq_unlock_post(ctx);
+
+ if (rsrc_node) {
+ io_ring_submit_lock(ctx, issue_flags);
+ io_put_rsrc_node(ctx, rsrc_node);
+ io_ring_submit_unlock(ctx, issue_flags);
+ }
}
void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
{
- if (req->ctx->task_complete && (issue_flags & IO_URING_F_IOWQ)) {
+ if (req->ctx->task_complete && req->ctx->submitter_task != current) {
req->io_task_work.func = io_req_task_complete;
io_req_task_work_add(req);
} else if (!(issue_flags & IO_URING_F_UNLOCKED) ||
!(req->ctx->flags & IORING_SETUP_IOPOLL)) {
- __io_req_complete_post(req);
+ __io_req_complete_post(req, issue_flags);
} else {
struct io_ring_ctx *ctx = req->ctx;
mutex_lock(&ctx->uring_lock);
- __io_req_complete_post(req);
+ __io_req_complete_post(req, issue_flags & ~IO_URING_F_UNLOCKED);
mutex_unlock(&ctx->uring_lock);
}
}
@@ -1106,11 +1121,14 @@ static inline void io_dismantle_req(struct io_kiocb *req)
io_put_file(req->file);
}
-__cold void io_free_req(struct io_kiocb *req)
+static __cold void io_free_req_tw(struct io_kiocb *req, struct io_tw_state *ts)
{
struct io_ring_ctx *ctx = req->ctx;
- io_req_put_rsrc(req);
+ if (req->rsrc_node) {
+ io_tw_lock(ctx, ts);
+ io_put_rsrc_node(ctx, req->rsrc_node);
+ }
io_dismantle_req(req);
io_put_task_remote(req->task, 1);
@@ -1120,6 +1138,12 @@ __cold void io_free_req(struct io_kiocb *req)
spin_unlock(&ctx->completion_lock);
}
+__cold void io_free_req(struct io_kiocb *req)
+{
+ req->io_task_work.func = io_free_req_tw;
+ io_req_task_work_add(req);
+}
+
static void __io_req_find_next_prep(struct io_kiocb *req)
{
struct io_ring_ctx *ctx = req->ctx;
@@ -1146,22 +1170,23 @@ static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req)
return nxt;
}
-static void ctx_flush_and_put(struct io_ring_ctx *ctx, bool *locked)
+static void ctx_flush_and_put(struct io_ring_ctx *ctx, struct io_tw_state *ts)
{
if (!ctx)
return;
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
- if (*locked) {
+ if (ts->locked) {
io_submit_flush_completions(ctx);
mutex_unlock(&ctx->uring_lock);
- *locked = false;
+ ts->locked = false;
}
percpu_ref_put(&ctx->refs);
}
static unsigned int handle_tw_list(struct llist_node *node,
- struct io_ring_ctx **ctx, bool *locked,
+ struct io_ring_ctx **ctx,
+ struct io_tw_state *ts,
struct llist_node *last)
{
unsigned int count = 0;
@@ -1174,18 +1199,17 @@ static unsigned int handle_tw_list(struct llist_node *node,
prefetch(container_of(next, struct io_kiocb, io_task_work.node));
if (req->ctx != *ctx) {
- ctx_flush_and_put(*ctx, locked);
+ ctx_flush_and_put(*ctx, ts);
*ctx = req->ctx;
/* if not contended, grab and improve batching */
- *locked = mutex_trylock(&(*ctx)->uring_lock);
+ ts->locked = mutex_trylock(&(*ctx)->uring_lock);
percpu_ref_get(&(*ctx)->refs);
- } else if (!*locked)
- *locked = mutex_trylock(&(*ctx)->uring_lock);
- req->io_task_work.func(req, locked);
+ }
+ req->io_task_work.func(req, ts);
node = next;
count++;
if (unlikely(need_resched())) {
- ctx_flush_and_put(*ctx, locked);
+ ctx_flush_and_put(*ctx, ts);
*ctx = NULL;
cond_resched();
}
@@ -1226,7 +1250,7 @@ static inline struct llist_node *io_llist_cmpxchg(struct llist_head *head,
void tctx_task_work(struct callback_head *cb)
{
- bool uring_locked = false;
+ struct io_tw_state ts = {};
struct io_ring_ctx *ctx = NULL;
struct io_uring_task *tctx = container_of(cb, struct io_uring_task,
task_work);
@@ -1243,12 +1267,12 @@ void tctx_task_work(struct callback_head *cb)
do {
loops++;
node = io_llist_xchg(&tctx->task_list, &fake);
- count += handle_tw_list(node, &ctx, &uring_locked, &fake);
+ count += handle_tw_list(node, &ctx, &ts, &fake);
/* skip expensive cmpxchg if there are items in the list */
if (READ_ONCE(tctx->task_list.first) != &fake)
continue;
- if (uring_locked && !wq_list_empty(&ctx->submit_state.compl_reqs)) {
+ if (ts.locked && !wq_list_empty(&ctx->submit_state.compl_reqs)) {
io_submit_flush_completions(ctx);
if (READ_ONCE(tctx->task_list.first) != &fake)
continue;
@@ -1256,7 +1280,7 @@ void tctx_task_work(struct callback_head *cb)
node = io_llist_cmpxchg(&tctx->task_list, &fake, NULL);
} while (node != &fake);
- ctx_flush_and_put(ctx, &uring_locked);
+ ctx_flush_and_put(ctx, &ts);
/* relaxed read is enough as only the task itself sets ->in_cancel */
if (unlikely(atomic_read(&tctx->in_cancel)))
@@ -1279,42 +1303,67 @@ static __cold void io_fallback_tw(struct io_uring_task *tctx)
}
}
-static void io_req_local_work_add(struct io_kiocb *req)
+static void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
{
struct io_ring_ctx *ctx = req->ctx;
+ unsigned nr_wait, nr_tw, nr_tw_prev;
+ struct llist_node *first;
- percpu_ref_get(&ctx->refs);
+ if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
+ flags &= ~IOU_F_TWQ_LAZY_WAKE;
- if (!llist_add(&req->io_task_work.node, &ctx->work_llist))
- goto put_ref;
-
- /* needed for the following wake up */
- smp_mb__after_atomic();
-
- if (unlikely(atomic_read(&req->task->io_uring->in_cancel))) {
- io_move_task_work_from_local(ctx);
- goto put_ref;
+ first = READ_ONCE(ctx->work_llist.first);
+ do {
+ nr_tw_prev = 0;
+ if (first) {
+ struct io_kiocb *first_req = container_of(first,
+ struct io_kiocb,
+ io_task_work.node);
+ /*
+ * Might be executed at any moment, rely on
+ * SLAB_TYPESAFE_BY_RCU to keep it alive.
+ */
+ nr_tw_prev = READ_ONCE(first_req->nr_tw);
+ }
+ nr_tw = nr_tw_prev + 1;
+ /* Large enough to fail the nr_wait comparison below */
+ if (!(flags & IOU_F_TWQ_LAZY_WAKE))
+ nr_tw = -1U;
+
+ req->nr_tw = nr_tw;
+ req->io_task_work.node.next = first;
+ } while (!try_cmpxchg(&ctx->work_llist.first, &first,
+ &req->io_task_work.node));
+
+ if (!first) {
+ if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
+ atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
+ if (ctx->has_evfd)
+ io_eventfd_signal(ctx);
}
- if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
- atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
- if (ctx->has_evfd)
- io_eventfd_signal(ctx);
-
- if (READ_ONCE(ctx->cq_waiting))
- wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
-
-put_ref:
- percpu_ref_put(&ctx->refs);
+ nr_wait = atomic_read(&ctx->cq_wait_nr);
+ /* no one is waiting */
+ if (!nr_wait)
+ return;
+ /* either not enough or the previous add has already woken it up */
+ if (nr_wait > nr_tw || nr_tw_prev >= nr_wait)
+ return;
+ /* pairs with set_current_state() in io_cqring_wait() */
+ smp_mb__after_atomic();
+ wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
}
-void __io_req_task_work_add(struct io_kiocb *req, bool allow_local)
+void __io_req_task_work_add(struct io_kiocb *req, unsigned flags)
{
struct io_uring_task *tctx = req->task->io_uring;
struct io_ring_ctx *ctx = req->ctx;
- if (allow_local && ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
- io_req_local_work_add(req);
+ if (!(flags & IOU_F_TWQ_FORCE_NORMAL) &&
+ (ctx->flags & IORING_SETUP_DEFER_TASKRUN)) {
+ rcu_read_lock();
+ io_req_local_work_add(req, flags);
+ rcu_read_unlock();
return;
}
@@ -1341,11 +1390,11 @@ static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
io_task_work.node);
node = node->next;
- __io_req_task_work_add(req, false);
+ __io_req_task_work_add(req, IOU_F_TWQ_FORCE_NORMAL);
}
}
-static int __io_run_local_work(struct io_ring_ctx *ctx, bool *locked)
+static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts)
{
struct llist_node *node;
unsigned int loops = 0;
@@ -1362,7 +1411,7 @@ again:
struct io_kiocb *req = container_of(node, struct io_kiocb,
io_task_work.node);
prefetch(container_of(next, struct io_kiocb, io_task_work.node));
- req->io_task_work.func(req, locked);
+ req->io_task_work.func(req, ts);
ret++;
node = next;
}
@@ -1370,7 +1419,7 @@ again:
if (!llist_empty(&ctx->work_llist))
goto again;
- if (*locked) {
+ if (ts->locked) {
io_submit_flush_completions(ctx);
if (!llist_empty(&ctx->work_llist))
goto again;
@@ -1381,46 +1430,46 @@ again:
static inline int io_run_local_work_locked(struct io_ring_ctx *ctx)
{
- bool locked;
+ struct io_tw_state ts = { .locked = true, };
int ret;
if (llist_empty(&ctx->work_llist))
return 0;
- locked = true;
- ret = __io_run_local_work(ctx, &locked);
+ ret = __io_run_local_work(ctx, &ts);
/* shouldn't happen! */
- if (WARN_ON_ONCE(!locked))
+ if (WARN_ON_ONCE(!ts.locked))
mutex_lock(&ctx->uring_lock);
return ret;
}
static int io_run_local_work(struct io_ring_ctx *ctx)
{
- bool locked = mutex_trylock(&ctx->uring_lock);
+ struct io_tw_state ts = {};
int ret;
- ret = __io_run_local_work(ctx, &locked);
- if (locked)
+ ts.locked = mutex_trylock(&ctx->uring_lock);
+ ret = __io_run_local_work(ctx, &ts);
+ if (ts.locked)
mutex_unlock(&ctx->uring_lock);
return ret;
}
-static void io_req_task_cancel(struct io_kiocb *req, bool *locked)
+static void io_req_task_cancel(struct io_kiocb *req, struct io_tw_state *ts)
{
- io_tw_lock(req->ctx, locked);
+ io_tw_lock(req->ctx, ts);
io_req_defer_failed(req, req->cqe.res);
}
-void io_req_task_submit(struct io_kiocb *req, bool *locked)
+void io_req_task_submit(struct io_kiocb *req, struct io_tw_state *ts)
{
- io_tw_lock(req->ctx, locked);
+ io_tw_lock(req->ctx, ts);
/* req->task == current here, checking PF_EXITING is safe */
if (unlikely(req->task->flags & PF_EXITING))
io_req_defer_failed(req, -EFAULT);
else if (req->flags & REQ_F_FORCE_ASYNC)
- io_queue_iowq(req, locked);
+ io_queue_iowq(req, ts);
else
io_queue_sqe(req);
}
@@ -1499,14 +1548,14 @@ void io_free_batch_list(struct io_ring_ctx *ctx, struct io_wq_work_node *node)
static void __io_submit_flush_completions(struct io_ring_ctx *ctx)
__must_hold(&ctx->uring_lock)
{
- struct io_wq_work_node *node, *prev;
struct io_submit_state *state = &ctx->submit_state;
+ struct io_wq_work_node *node;
__io_cq_lock(ctx);
/* must come first to preserve CQE ordering in failure cases */
if (state->cqes_count)
__io_flush_post_cqes(ctx);
- wq_list_for_each(node, prev, &state->compl_reqs) {
+ __wq_list_for_each(node, &state->compl_reqs) {
struct io_kiocb *req = container_of(node, struct io_kiocb,
comp_list);
@@ -1646,9 +1695,9 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
return ret;
}
-void io_req_task_complete(struct io_kiocb *req, bool *locked)
+void io_req_task_complete(struct io_kiocb *req, struct io_tw_state *ts)
{
- if (*locked)
+ if (ts->locked)
io_req_complete_defer(req);
else
io_req_complete_post(req, IO_URING_F_UNLOCKED);
@@ -1927,9 +1976,9 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
return 0;
}
-int io_poll_issue(struct io_kiocb *req, bool *locked)
+int io_poll_issue(struct io_kiocb *req, struct io_tw_state *ts)
{
- io_tw_lock(req->ctx, locked);
+ io_tw_lock(req->ctx, ts);
return io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_MULTISHOT|
IO_URING_F_COMPLETE_DEFER);
}
@@ -2298,8 +2347,7 @@ static inline int io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
if (unlikely(ret))
return io_submit_fail_init(sqe, req, ret);
- /* don't need @sqe from now on */
- trace_io_uring_submit_sqe(req, true);
+ trace_io_uring_submit_req(req);
/*
* If we already have a head request, queue this one for async
@@ -2428,7 +2476,7 @@ int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
if (unlikely(!entries))
return 0;
/* make sure SQ entry isn't read before tail */
- ret = left = min3(nr, ctx->sq_entries, entries);
+ ret = left = min(nr, entries);
io_get_task_refs(left);
io_submit_state_start(&ctx->submit_state, left);
@@ -2600,7 +2648,9 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
unsigned long check_cq;
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
- WRITE_ONCE(ctx->cq_waiting, 1);
+ int nr_wait = (int) iowq.cq_tail - READ_ONCE(ctx->rings->cq.tail);
+
+ atomic_set(&ctx->cq_wait_nr, nr_wait);
set_current_state(TASK_INTERRUPTIBLE);
} else {
prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
@@ -2609,7 +2659,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
ret = io_cqring_wait_schedule(ctx, &iowq);
__set_current_state(TASK_RUNNING);
- WRITE_ONCE(ctx->cq_waiting, 0);
+ atomic_set(&ctx->cq_wait_nr, 0);
if (ret < 0)
break;
@@ -2772,13 +2822,17 @@ static void io_req_caches_free(struct io_ring_ctx *ctx)
mutex_unlock(&ctx->uring_lock);
}
+static void io_rsrc_node_cache_free(struct io_cache_entry *entry)
+{
+ kfree(container_of(entry, struct io_rsrc_node, cache));
+}
+
static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
{
io_sq_thread_finish(ctx);
- io_rsrc_refs_drop(ctx);
/* __io_rsrc_put_work() may need uring_lock to progress, wait w/o it */
- io_wait_rsrc_data(ctx->buf_data);
- io_wait_rsrc_data(ctx->file_data);
+ if (WARN_ON_ONCE(!list_empty(&ctx->rsrc_ref_list)))
+ return;
mutex_lock(&ctx->uring_lock);
if (ctx->buf_data)
@@ -2789,8 +2843,8 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
io_eventfd_unregister(ctx);
io_alloc_cache_free(&ctx->apoll_cache, io_apoll_cache_free);
io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free);
- mutex_unlock(&ctx->uring_lock);
io_destroy_buffers(ctx);
+ mutex_unlock(&ctx->uring_lock);
if (ctx->sq_creds)
put_cred(ctx->sq_creds);
if (ctx->submitter_task)
@@ -2798,14 +2852,9 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
/* there are no registered resources left, nobody uses it */
if (ctx->rsrc_node)
- io_rsrc_node_destroy(ctx->rsrc_node);
- if (ctx->rsrc_backup_node)
- io_rsrc_node_destroy(ctx->rsrc_backup_node);
- flush_delayed_work(&ctx->rsrc_put_work);
- flush_delayed_work(&ctx->fallback_work);
+ io_rsrc_node_destroy(ctx, ctx->rsrc_node);
WARN_ON_ONCE(!list_empty(&ctx->rsrc_ref_list));
- WARN_ON_ONCE(!llist_empty(&ctx->rsrc_put_llist));
#if defined(CONFIG_UNIX)
if (ctx->ring_sock) {
@@ -2815,6 +2864,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
#endif
WARN_ON_ONCE(!list_empty(&ctx->ltimeout_list));
+ io_alloc_cache_free(&ctx->rsrc_node_cache, io_rsrc_node_cache_free);
if (ctx->mm_account) {
mmdrop(ctx->mm_account);
ctx->mm_account = NULL;
@@ -3031,6 +3081,10 @@ static __cold void io_ring_exit_work(struct work_struct *work)
spin_lock(&ctx->completion_lock);
spin_unlock(&ctx->completion_lock);
+ /* pairs with RCU read section in io_req_local_work_add() */
+ if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
+ synchronize_rcu();
+
io_ring_ctx_free(ctx);
}
@@ -3146,6 +3200,12 @@ static __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
enum io_wq_cancel cret;
bool ret = false;
+ /* set it so io_req_local_work_add() would wake us up */
+ if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
+ atomic_set(&ctx->cq_wait_nr, 1);
+ smp_mb();
+ }
+
/* failed during ring init, it couldn't have issued any requests */
if (!ctx->rings)
return false;
@@ -3200,6 +3260,8 @@ __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd)
{
struct io_uring_task *tctx = current->io_uring;
struct io_ring_ctx *ctx;
+ struct io_tctx_node *node;
+ unsigned long index;
s64 inflight;
DEFINE_WAIT(wait);
@@ -3221,9 +3283,6 @@ __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd)
break;
if (!sqd) {
- struct io_tctx_node *node;
- unsigned long index;
-
xa_for_each(&tctx->xa, index, node) {
/* sqpoll task will cancel all its requests */
if (node->ctx->sq_data)
@@ -3246,7 +3305,13 @@ __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd)
prepare_to_wait(&tctx->wait, &wait, TASK_INTERRUPTIBLE);
io_run_task_work();
io_uring_drop_tctx_refs(current);
-
+ xa_for_each(&tctx->xa, index, node) {
+ if (!llist_empty(&node->ctx->work_llist)) {
+ WARN_ON_ONCE(node->ctx->submitter_task &&
+ node->ctx->submitter_task != current);
+ goto end_wait;
+ }
+ }
/*
* If we've seen completions, retry without waiting. This
* avoids a race where a completion comes in before we did
@@ -3254,6 +3319,7 @@ __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd)
*/
if (inflight == tctx_inflight(tctx, !cancel_all))
schedule();
+end_wait:
finish_wait(&tctx->wait, &wait);
} while (1);
@@ -3282,7 +3348,7 @@ static void *io_uring_validate_mmap_request(struct file *file,
struct page *page;
void *ptr;
- switch (offset) {
+ switch (offset & IORING_OFF_MMAP_MASK) {
case IORING_OFF_SQ_RING:
case IORING_OFF_CQ_RING:
ptr = ctx->rings;
@@ -3290,6 +3356,17 @@ static void *io_uring_validate_mmap_request(struct file *file,
case IORING_OFF_SQES:
ptr = ctx->sq_sqes;
break;
+ case IORING_OFF_PBUF_RING: {
+ unsigned int bgid;
+
+ bgid = (offset & ~IORING_OFF_MMAP_MASK) >> IORING_OFF_PBUF_SHIFT;
+ mutex_lock(&ctx->uring_lock);
+ ptr = io_pbuf_get_address(ctx, bgid);
+ mutex_unlock(&ctx->uring_lock);
+ if (!ptr)
+ return ERR_PTR(-EINVAL);
+ break;
+ }
default:
return ERR_PTR(-EINVAL);
}
@@ -3317,6 +3394,54 @@ static __cold int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
}
+static unsigned long io_uring_mmu_get_unmapped_area(struct file *filp,
+ unsigned long addr, unsigned long len,
+ unsigned long pgoff, unsigned long flags)
+{
+ const unsigned long mmap_end = arch_get_mmap_end(addr, len, flags);
+ struct vm_unmapped_area_info info;
+ void *ptr;
+
+ /*
+ * Do not allow to map to user-provided address to avoid breaking the
+ * aliasing rules. Userspace is not able to guess the offset address of
+ * kernel kmalloc()ed memory area.
+ */
+ if (addr)
+ return -EINVAL;
+
+ ptr = io_uring_validate_mmap_request(filp, pgoff, len);
+ if (IS_ERR(ptr))
+ return -ENOMEM;
+
+ info.flags = VM_UNMAPPED_AREA_TOPDOWN;
+ info.length = len;
+ info.low_limit = max(PAGE_SIZE, mmap_min_addr);
+ info.high_limit = arch_get_mmap_base(addr, current->mm->mmap_base);
+#ifdef SHM_COLOUR
+ info.align_mask = PAGE_MASK & (SHM_COLOUR - 1UL);
+#else
+ info.align_mask = PAGE_MASK & (SHMLBA - 1UL);
+#endif
+ info.align_offset = (unsigned long) ptr;
+
+ /*
+ * A failed mmap() very likely causes application failure,
+ * so fall back to the bottom-up function here. This scenario
+ * can happen with large stack limits and large mmap()
+ * allocations.
+ */
+ addr = vm_unmapped_area(&info);
+ if (offset_in_page(addr)) {
+ info.flags = 0;
+ info.low_limit = TASK_UNMAPPED_BASE;
+ info.high_limit = mmap_end;
+ addr = vm_unmapped_area(&info);
+ }
+
+ return addr;
+}
+
#else /* !CONFIG_MMU */
static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
@@ -3529,6 +3654,8 @@ static const struct file_operations io_uring_fops = {
#ifndef CONFIG_MMU
.get_unmapped_area = io_uring_nommu_get_unmapped_area,
.mmap_capabilities = io_uring_nommu_mmap_capabilities,
+#else
+ .get_unmapped_area = io_uring_mmu_get_unmapped_area,
#endif
.poll = io_uring_poll,
#ifdef CONFIG_PROC_FS
@@ -3755,11 +3882,10 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
ret = io_sq_offload_create(ctx, p);
if (ret)
goto err;
- /* always set a rsrc node */
- ret = io_rsrc_node_switch_start(ctx);
+
+ ret = io_rsrc_init(ctx);
if (ret)
goto err;
- io_rsrc_node_switch(ctx, NULL);
memset(&p->sq_off, 0, sizeof(p->sq_off));
p->sq_off.head = offsetof(struct io_rings, sq.head);
@@ -4425,7 +4551,7 @@ static int __init io_uring_init(void)
io_uring_optable_init();
req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC |
- SLAB_ACCOUNT);
+ SLAB_ACCOUNT | SLAB_TYPESAFE_BY_RCU);
return 0;
};
__initcall(io_uring_init);
diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h
index 2711865f1e19..25515d69d205 100644
--- a/io_uring/io_uring.h
+++ b/io_uring/io_uring.h
@@ -16,6 +16,20 @@
#endif
enum {
+ /* don't use deferred task_work */
+ IOU_F_TWQ_FORCE_NORMAL = 1,
+
+ /*
+ * A hint to not wake right away but delay until there are enough of
+ * tw's queued to match the number of CQEs the task is waiting for.
+ *
+ * Must not be used wirh requests generating more than one CQE.
+ * It's also ignored unless IORING_SETUP_DEFER_TASKRUN is set.
+ */
+ IOU_F_TWQ_LAZY_WAKE = 2,
+};
+
+enum {
IOU_OK = 0,
IOU_ISSUE_SKIP_COMPLETE = -EIOCBQUEUED,
@@ -48,20 +62,20 @@ static inline bool io_req_ffs_set(struct io_kiocb *req)
return req->flags & REQ_F_FIXED_FILE;
}
-void __io_req_task_work_add(struct io_kiocb *req, bool allow_local);
+void __io_req_task_work_add(struct io_kiocb *req, unsigned flags);
bool io_is_uring_fops(struct file *file);
bool io_alloc_async_data(struct io_kiocb *req);
void io_req_task_queue(struct io_kiocb *req);
-void io_queue_iowq(struct io_kiocb *req, bool *dont_use);
-void io_req_task_complete(struct io_kiocb *req, bool *locked);
+void io_queue_iowq(struct io_kiocb *req, struct io_tw_state *ts_dont_use);
+void io_req_task_complete(struct io_kiocb *req, struct io_tw_state *ts);
void io_req_task_queue_fail(struct io_kiocb *req, int ret);
-void io_req_task_submit(struct io_kiocb *req, bool *locked);
+void io_req_task_submit(struct io_kiocb *req, struct io_tw_state *ts);
void tctx_task_work(struct callback_head *cb);
__cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd);
int io_uring_alloc_task_context(struct task_struct *task,
struct io_ring_ctx *ctx);
-int io_poll_issue(struct io_kiocb *req, bool *locked);
+int io_poll_issue(struct io_kiocb *req, struct io_tw_state *ts);
int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr);
int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin);
void io_free_batch_list(struct io_ring_ctx *ctx, struct io_wq_work_node *node);
@@ -80,6 +94,8 @@ bool io_match_task_safe(struct io_kiocb *head, struct task_struct *task,
#define io_lockdep_assert_cq_locked(ctx) \
do { \
+ lockdep_assert(in_task()); \
+ \
if (ctx->flags & IORING_SETUP_IOPOLL) { \
lockdep_assert_held(&ctx->uring_lock); \
} else if (!ctx->task_complete) { \
@@ -93,7 +109,7 @@ bool io_match_task_safe(struct io_kiocb *head, struct task_struct *task,
static inline void io_req_task_work_add(struct io_kiocb *req)
{
- __io_req_task_work_add(req, true);
+ __io_req_task_work_add(req, 0);
}
#define io_for_each_link(pos, head) \
@@ -228,8 +244,7 @@ static inline void io_poll_wq_wake(struct io_ring_ctx *ctx)
poll_to_key(EPOLL_URING_WAKE | EPOLLIN));
}
-/* requires smb_mb() prior, see wq_has_sleeper() */
-static inline void __io_cqring_wake(struct io_ring_ctx *ctx)
+static inline void io_cqring_wake(struct io_ring_ctx *ctx)
{
/*
* Trigger waitqueue handler on all waiters on our waitqueue. This
@@ -241,17 +256,11 @@ static inline void __io_cqring_wake(struct io_ring_ctx *ctx)
* waitqueue handlers, we know we have a dependency between eventfd or
* epoll and should terminate multishot poll at that point.
*/
- if (waitqueue_active(&ctx->cq_wait))
+ if (wq_has_sleeper(&ctx->cq_wait))
__wake_up(&ctx->cq_wait, TASK_NORMAL, 0,
poll_to_key(EPOLL_URING_WAKE | EPOLLIN));
}
-static inline void io_cqring_wake(struct io_ring_ctx *ctx)
-{
- smp_mb();
- __io_cqring_wake(ctx);
-}
-
static inline bool io_sqring_full(struct io_ring_ctx *ctx)
{
struct io_rings *r = ctx->rings;
@@ -262,9 +271,11 @@ static inline bool io_sqring_full(struct io_ring_ctx *ctx)
static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
{
struct io_rings *rings = ctx->rings;
+ unsigned int entries;
/* make sure SQ entry isn't read before tail */
- return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
+ entries = smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
+ return min(entries, ctx->sq_entries);
}
static inline int io_run_task_work(void)
@@ -299,11 +310,11 @@ static inline bool io_task_work_pending(struct io_ring_ctx *ctx)
return task_work_pending(current) || !wq_list_empty(&ctx->work_llist);
}
-static inline void io_tw_lock(struct io_ring_ctx *ctx, bool *locked)
+static inline void io_tw_lock(struct io_ring_ctx *ctx, struct io_tw_state *ts)
{
- if (!*locked) {
+ if (!ts->locked) {
mutex_lock(&ctx->uring_lock);
- *locked = true;
+ ts->locked = true;
}
}
diff --git a/io_uring/kbuf.c b/io_uring/kbuf.c
index 3002dc827195..2f0181521c98 100644
--- a/io_uring/kbuf.c
+++ b/io_uring/kbuf.c
@@ -137,7 +137,8 @@ static void __user *io_ring_buffer_select(struct io_kiocb *req, size_t *len,
return NULL;
head &= bl->mask;
- if (head < IO_BUFFER_LIST_BUF_PER_PAGE) {
+ /* mmaped buffers are always contig */
+ if (bl->is_mmap || head < IO_BUFFER_LIST_BUF_PER_PAGE) {
buf = &br->bufs[head];
} else {
int off = head & (IO_BUFFER_LIST_BUF_PER_PAGE - 1);
@@ -179,7 +180,7 @@ void __user *io_buffer_select(struct io_kiocb *req, size_t *len,
bl = io_buffer_get_list(ctx, req->buf_index);
if (likely(bl)) {
- if (bl->buf_nr_pages)
+ if (bl->is_mapped)
ret = io_ring_buffer_select(req, len, bl, issue_flags);
else
ret = io_provided_buffer_select(req, len, bl);
@@ -214,31 +215,43 @@ static int __io_remove_buffers(struct io_ring_ctx *ctx,
if (!nbufs)
return 0;
- if (bl->buf_nr_pages) {
- int j;
-
+ if (bl->is_mapped) {
i = bl->buf_ring->tail - bl->head;
- for (j = 0; j < bl->buf_nr_pages; j++)
- unpin_user_page(bl->buf_pages[j]);
- kvfree(bl->buf_pages);
- bl->buf_pages = NULL;
- bl->buf_nr_pages = 0;
+ if (bl->is_mmap) {
+ struct page *page;
+
+ page = virt_to_head_page(bl->buf_ring);
+ if (put_page_testzero(page))
+ free_compound_page(page);
+ bl->buf_ring = NULL;
+ bl->is_mmap = 0;
+ } else if (bl->buf_nr_pages) {
+ int j;
+
+ for (j = 0; j < bl->buf_nr_pages; j++)
+ unpin_user_page(bl->buf_pages[j]);
+ kvfree(bl->buf_pages);
+ bl->buf_pages = NULL;
+ bl->buf_nr_pages = 0;
+ }
/* make sure it's seen as empty */
INIT_LIST_HEAD(&bl->buf_list);
+ bl->is_mapped = 0;
return i;
}
- /* the head kbuf is the list itself */
+ /* protects io_buffers_cache */
+ lockdep_assert_held(&ctx->uring_lock);
+
while (!list_empty(&bl->buf_list)) {
struct io_buffer *nxt;
nxt = list_first_entry(&bl->buf_list, struct io_buffer, list);
- list_del(&nxt->list);
+ list_move(&nxt->list, &ctx->io_buffers_cache);
if (++i == nbufs)
return i;
cond_resched();
}
- i++;
return i;
}
@@ -303,7 +316,7 @@ int io_remove_buffers(struct io_kiocb *req, unsigned int issue_flags)
if (bl) {
ret = -EINVAL;
/* can't use provide/remove buffers command on mapped buffers */
- if (!bl->buf_nr_pages)
+ if (!bl->is_mapped)
ret = __io_remove_buffers(ctx, bl, p->nbufs);
}
io_ring_submit_unlock(ctx, issue_flags);
@@ -448,7 +461,7 @@ int io_provide_buffers(struct io_kiocb *req, unsigned int issue_flags)
}
}
/* can't add buffers via this command for a mapped buffer ring */
- if (bl->buf_nr_pages) {
+ if (bl->is_mapped) {
ret = -EINVAL;
goto err;
}
@@ -463,23 +476,87 @@ err:
return IOU_OK;
}
-int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
+static int io_pin_pbuf_ring(struct io_uring_buf_reg *reg,
+ struct io_buffer_list *bl)
{
struct io_uring_buf_ring *br;
- struct io_uring_buf_reg reg;
- struct io_buffer_list *bl, *free_bl = NULL;
struct page **pages;
int nr_pages;
+ pages = io_pin_pages(reg->ring_addr,
+ flex_array_size(br, bufs, reg->ring_entries),
+ &nr_pages);
+ if (IS_ERR(pages))
+ return PTR_ERR(pages);
+
+ br = page_address(pages[0]);
+#ifdef SHM_COLOUR
+ /*
+ * On platforms that have specific aliasing requirements, SHM_COLOUR
+ * is set and we must guarantee that the kernel and user side align
+ * nicely. We cannot do that if IOU_PBUF_RING_MMAP isn't set and
+ * the application mmap's the provided ring buffer. Fail the request
+ * if we, by chance, don't end up with aligned addresses. The app
+ * should use IOU_PBUF_RING_MMAP instead, and liburing will handle
+ * this transparently.
+ */
+ if ((reg->ring_addr | (unsigned long) br) & (SHM_COLOUR - 1)) {
+ int i;
+
+ for (i = 0; i < nr_pages; i++)
+ unpin_user_page(pages[i]);
+ return -EINVAL;
+ }
+#endif
+ bl->buf_pages = pages;
+ bl->buf_nr_pages = nr_pages;
+ bl->buf_ring = br;
+ bl->is_mapped = 1;
+ bl->is_mmap = 0;
+ return 0;
+}
+
+static int io_alloc_pbuf_ring(struct io_uring_buf_reg *reg,
+ struct io_buffer_list *bl)
+{
+ gfp_t gfp = GFP_KERNEL_ACCOUNT | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP;
+ size_t ring_size;
+ void *ptr;
+
+ ring_size = reg->ring_entries * sizeof(struct io_uring_buf_ring);
+ ptr = (void *) __get_free_pages(gfp, get_order(ring_size));
+ if (!ptr)
+ return -ENOMEM;
+
+ bl->buf_ring = ptr;
+ bl->is_mapped = 1;
+ bl->is_mmap = 1;
+ return 0;
+}
+
+int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
+{
+ struct io_uring_buf_reg reg;
+ struct io_buffer_list *bl, *free_bl = NULL;
+ int ret;
+
if (copy_from_user(&reg, arg, sizeof(reg)))
return -EFAULT;
- if (reg.pad || reg.resv[0] || reg.resv[1] || reg.resv[2])
+ if (reg.resv[0] || reg.resv[1] || reg.resv[2])
return -EINVAL;
- if (!reg.ring_addr)
- return -EFAULT;
- if (reg.ring_addr & ~PAGE_MASK)
+ if (reg.flags & ~IOU_PBUF_RING_MMAP)
return -EINVAL;
+ if (!(reg.flags & IOU_PBUF_RING_MMAP)) {
+ if (!reg.ring_addr)
+ return -EFAULT;
+ if (reg.ring_addr & ~PAGE_MASK)
+ return -EINVAL;
+ } else {
+ if (reg.ring_addr)
+ return -EINVAL;
+ }
+
if (!is_power_of_2(reg.ring_entries))
return -EINVAL;
@@ -496,7 +573,7 @@ int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
bl = io_buffer_get_list(ctx, reg.bgid);
if (bl) {
/* if mapped buffer ring OR classic exists, don't allow */
- if (bl->buf_nr_pages || !list_empty(&bl->buf_list))
+ if (bl->is_mapped || !list_empty(&bl->buf_list))
return -EEXIST;
} else {
free_bl = bl = kzalloc(sizeof(*bl), GFP_KERNEL);
@@ -504,22 +581,21 @@ int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
return -ENOMEM;
}
- pages = io_pin_pages(reg.ring_addr,
- flex_array_size(br, bufs, reg.ring_entries),
- &nr_pages);
- if (IS_ERR(pages)) {
- kfree(free_bl);
- return PTR_ERR(pages);
+ if (!(reg.flags & IOU_PBUF_RING_MMAP))
+ ret = io_pin_pbuf_ring(&reg, bl);
+ else
+ ret = io_alloc_pbuf_ring(&reg, bl);
+
+ if (!ret) {
+ bl->nr_entries = reg.ring_entries;
+ bl->mask = reg.ring_entries - 1;
+
+ io_buffer_add_list(ctx, bl, reg.bgid);
+ return 0;
}
- br = page_address(pages[0]);
- bl->buf_pages = pages;
- bl->buf_nr_pages = nr_pages;
- bl->nr_entries = reg.ring_entries;
- bl->buf_ring = br;
- bl->mask = reg.ring_entries - 1;
- io_buffer_add_list(ctx, bl, reg.bgid);
- return 0;
+ kfree(free_bl);
+ return ret;
}
int io_unregister_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
@@ -529,13 +605,15 @@ int io_unregister_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
if (copy_from_user(&reg, arg, sizeof(reg)))
return -EFAULT;
- if (reg.pad || reg.resv[0] || reg.resv[1] || reg.resv[2])
+ if (reg.resv[0] || reg.resv[1] || reg.resv[2])
+ return -EINVAL;
+ if (reg.flags)
return -EINVAL;
bl = io_buffer_get_list(ctx, reg.bgid);
if (!bl)
return -ENOENT;
- if (!bl->buf_nr_pages)
+ if (!bl->is_mapped)
return -EINVAL;
__io_remove_buffers(ctx, bl, -1U);
@@ -545,3 +623,14 @@ int io_unregister_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
}
return 0;
}
+
+void *io_pbuf_get_address(struct io_ring_ctx *ctx, unsigned long bgid)
+{
+ struct io_buffer_list *bl;
+
+ bl = io_buffer_get_list(ctx, bgid);
+ if (!bl || !bl->is_mmap)
+ return NULL;
+
+ return bl->buf_ring;
+}
diff --git a/io_uring/kbuf.h b/io_uring/kbuf.h
index c23e15d7d3ca..d14345ef61fc 100644
--- a/io_uring/kbuf.h
+++ b/io_uring/kbuf.h
@@ -23,6 +23,11 @@ struct io_buffer_list {
__u16 nr_entries;
__u16 head;
__u16 mask;
+
+ /* ring mapped provided buffers */
+ __u8 is_mapped;
+ /* ring mapped provided buffers, but mmap'ed by application */
+ __u8 is_mmap;
};
struct io_buffer {
@@ -50,6 +55,8 @@ unsigned int __io_put_kbuf(struct io_kiocb *req, unsigned issue_flags);
void io_kbuf_recycle_legacy(struct io_kiocb *req, unsigned issue_flags);
+void *io_pbuf_get_address(struct io_ring_ctx *ctx, unsigned long bgid);
+
static inline void io_kbuf_recycle_ring(struct io_kiocb *req)
{
/*
diff --git a/io_uring/msg_ring.c b/io_uring/msg_ring.c
index 8803c0979e2a..85fd7ce5f05b 100644
--- a/io_uring/msg_ring.c
+++ b/io_uring/msg_ring.c
@@ -202,7 +202,7 @@ static int io_msg_install_complete(struct io_kiocb *req, unsigned int issue_flag
* completes with -EOVERFLOW, then the sender must ensure that a
* later IORING_OP_MSG_RING delivers the message.
*/
- if (!io_post_aux_cqe(target_ctx, msg->user_data, msg->len, 0))
+ if (!io_post_aux_cqe(target_ctx, msg->user_data, ret, 0))
ret = -EOVERFLOW;
out_unlock:
io_double_unlock_ctx(target_ctx);
@@ -229,6 +229,8 @@ static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags)
struct io_ring_ctx *ctx = req->ctx;
struct file *src_file = msg->src_file;
+ if (msg->len)
+ return -EINVAL;
if (target_ctx == ctx)
return -EINVAL;
if (target_ctx->flags & IORING_SETUP_R_DISABLED)
diff --git a/io_uring/net.c b/io_uring/net.c
index b7f190ca528e..89e839013837 100644
--- a/io_uring/net.c
+++ b/io_uring/net.c
@@ -47,6 +47,7 @@ struct io_connect {
struct sockaddr __user *addr;
int addr_len;
bool in_progress;
+ bool seen_econnaborted;
};
struct io_sr_msg {
@@ -183,8 +184,8 @@ static int io_setup_async_msg(struct io_kiocb *req,
async_msg->msg.msg_name = &async_msg->addr;
/* if were using fast_iov, set it to the new one */
if (iter_is_iovec(&kmsg->msg.msg_iter) && !kmsg->free_iov) {
- size_t fast_idx = kmsg->msg.msg_iter.iov - kmsg->fast_iov;
- async_msg->msg.msg_iter.iov = &async_msg->fast_iov[fast_idx];
+ size_t fast_idx = iter_iov(&kmsg->msg.msg_iter) - kmsg->fast_iov;
+ async_msg->msg.msg_iter.__iov = &async_msg->fast_iov[fast_idx];
}
return -EAGAIN;
@@ -1424,7 +1425,7 @@ int io_connect_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
conn->addr = u64_to_user_ptr(READ_ONCE(sqe->addr));
conn->addr_len = READ_ONCE(sqe->addr2);
- conn->in_progress = false;
+ conn->in_progress = conn->seen_econnaborted = false;
return 0;
}
@@ -1461,18 +1462,24 @@ int io_connect(struct io_kiocb *req, unsigned int issue_flags)
ret = __sys_connect_file(req->file, &io->address,
connect->addr_len, file_flags);
- if ((ret == -EAGAIN || ret == -EINPROGRESS) && force_nonblock) {
+ if ((ret == -EAGAIN || ret == -EINPROGRESS || ret == -ECONNABORTED)
+ && force_nonblock) {
if (ret == -EINPROGRESS) {
connect->in_progress = true;
- } else {
- if (req_has_async_data(req))
- return -EAGAIN;
- if (io_alloc_async_data(req)) {
- ret = -ENOMEM;
+ return -EAGAIN;
+ }
+ if (ret == -ECONNABORTED) {
+ if (connect->seen_econnaborted)
goto out;
- }
- memcpy(req->async_data, &__io, sizeof(__io));
+ connect->seen_econnaborted = true;
+ }
+ if (req_has_async_data(req))
+ return -EAGAIN;
+ if (io_alloc_async_data(req)) {
+ ret = -ENOMEM;
+ goto out;
}
+ memcpy(req->async_data, &__io, sizeof(__io));
return -EAGAIN;
}
if (ret == -ERESTARTSYS)
diff --git a/io_uring/net.h b/io_uring/net.h
index 5ffa11bf5d2e..191009979bcb 100644
--- a/io_uring/net.h
+++ b/io_uring/net.h
@@ -5,8 +5,8 @@
#include "alloc_cache.h"
-#if defined(CONFIG_NET)
struct io_async_msghdr {
+#if defined(CONFIG_NET)
union {
struct iovec fast_iov[UIO_FASTIOV];
struct {
@@ -22,8 +22,11 @@ struct io_async_msghdr {
struct sockaddr __user *uaddr;
struct msghdr msg;
struct sockaddr_storage addr;
+#endif
};
+#if defined(CONFIG_NET)
+
struct io_async_connect {
struct sockaddr_storage address;
};
diff --git a/io_uring/notif.c b/io_uring/notif.c
index 09dfd0832d19..d3e703c37aba 100644
--- a/io_uring/notif.c
+++ b/io_uring/notif.c
@@ -9,7 +9,7 @@
#include "notif.h"
#include "rsrc.h"
-static void io_notif_complete_tw_ext(struct io_kiocb *notif, bool *locked)
+static void io_notif_complete_tw_ext(struct io_kiocb *notif, struct io_tw_state *ts)
{
struct io_notif_data *nd = io_notif_to_data(notif);
struct io_ring_ctx *ctx = notif->ctx;
@@ -21,7 +21,7 @@ static void io_notif_complete_tw_ext(struct io_kiocb *notif, bool *locked)
__io_unaccount_mem(ctx->user, nd->account_pages);
nd->account_pages = 0;
}
- io_req_task_complete(notif, locked);
+ io_req_task_complete(notif, ts);
}
static void io_tx_ubuf_callback(struct sk_buff *skb, struct ubuf_info *uarg,
@@ -31,7 +31,7 @@ static void io_tx_ubuf_callback(struct sk_buff *skb, struct ubuf_info *uarg,
struct io_kiocb *notif = cmd_to_io_kiocb(nd);
if (refcount_dec_and_test(&uarg->refcnt))
- io_req_task_work_add(notif);
+ __io_req_task_work_add(notif, IOU_F_TWQ_LAZY_WAKE);
}
static void io_tx_ubuf_callback_ext(struct sk_buff *skb, struct ubuf_info *uarg,
@@ -79,7 +79,7 @@ struct io_kiocb *io_alloc_notif(struct io_ring_ctx *ctx)
notif->io_task_work.func = io_req_task_complete;
nd = io_notif_to_data(notif);
- nd->uarg.flags = SKBFL_ZEROCOPY_FRAG | SKBFL_DONT_ORPHAN;
+ nd->uarg.flags = IO_NOTIF_UBUF_FLAGS;
nd->uarg.callback = io_tx_ubuf_callback;
refcount_set(&nd->uarg.refcnt, 1);
return notif;
diff --git a/io_uring/notif.h b/io_uring/notif.h
index c88c800cd89d..86d32bd9f856 100644
--- a/io_uring/notif.h
+++ b/io_uring/notif.h
@@ -7,6 +7,7 @@
#include "rsrc.h"
+#define IO_NOTIF_UBUF_FLAGS (SKBFL_ZEROCOPY_FRAG | SKBFL_DONT_ORPHAN)
#define IO_NOTIF_SPLICE_BATCH 32
struct io_notif_data {
@@ -33,7 +34,7 @@ static inline void io_notif_flush(struct io_kiocb *notif)
/* drop slot's master ref */
if (refcount_dec_and_test(&nd->uarg.refcnt))
- io_req_task_work_add(notif);
+ __io_req_task_work_add(notif, IOU_F_TWQ_LAZY_WAKE);
}
static inline int io_notif_account_mem(struct io_kiocb *notif, unsigned len)
diff --git a/io_uring/poll.c b/io_uring/poll.c
index 795facbd0e9f..c90e47dc1e29 100644
--- a/io_uring/poll.c
+++ b/io_uring/poll.c
@@ -148,7 +148,7 @@ static void io_poll_req_insert_locked(struct io_kiocb *req)
hlist_add_head(&req->hash_node, &table->hbs[index].list);
}
-static void io_poll_tw_hash_eject(struct io_kiocb *req, bool *locked)
+static void io_poll_tw_hash_eject(struct io_kiocb *req, struct io_tw_state *ts)
{
struct io_ring_ctx *ctx = req->ctx;
@@ -159,7 +159,7 @@ static void io_poll_tw_hash_eject(struct io_kiocb *req, bool *locked)
* already grabbed the mutex for us, but there is a chance it
* failed.
*/
- io_tw_lock(ctx, locked);
+ io_tw_lock(ctx, ts);
hash_del(&req->hash_node);
req->flags &= ~REQ_F_HASH_LOCKED;
} else {
@@ -238,7 +238,7 @@ enum {
* req->cqe.res. IOU_POLL_REMOVE_POLL_USE_RES indicates to remove multishot
* poll and that the result is stored in req->cqe.
*/
-static int io_poll_check_events(struct io_kiocb *req, bool *locked)
+static int io_poll_check_events(struct io_kiocb *req, struct io_tw_state *ts)
{
int v;
@@ -300,13 +300,13 @@ static int io_poll_check_events(struct io_kiocb *req, bool *locked)
__poll_t mask = mangle_poll(req->cqe.res &
req->apoll_events);
- if (!io_aux_cqe(req->ctx, *locked, req->cqe.user_data,
+ if (!io_aux_cqe(req->ctx, ts->locked, req->cqe.user_data,
mask, IORING_CQE_F_MORE, false)) {
io_req_set_res(req, mask, 0);
return IOU_POLL_REMOVE_POLL_USE_RES;
}
} else {
- int ret = io_poll_issue(req, locked);
+ int ret = io_poll_issue(req, ts);
if (ret == IOU_STOP_MULTISHOT)
return IOU_POLL_REMOVE_POLL_USE_RES;
if (ret < 0)
@@ -326,15 +326,15 @@ static int io_poll_check_events(struct io_kiocb *req, bool *locked)
return IOU_POLL_NO_ACTION;
}
-static void io_poll_task_func(struct io_kiocb *req, bool *locked)
+static void io_poll_task_func(struct io_kiocb *req, struct io_tw_state *ts)
{
int ret;
- ret = io_poll_check_events(req, locked);
+ ret = io_poll_check_events(req, ts);
if (ret == IOU_POLL_NO_ACTION)
return;
io_poll_remove_entries(req);
- io_poll_tw_hash_eject(req, locked);
+ io_poll_tw_hash_eject(req, ts);
if (req->opcode == IORING_OP_POLL_ADD) {
if (ret == IOU_POLL_DONE) {
@@ -343,7 +343,7 @@ static void io_poll_task_func(struct io_kiocb *req, bool *locked)
poll = io_kiocb_to_cmd(req, struct io_poll);
req->cqe.res = mangle_poll(req->cqe.res & poll->events);
} else if (ret == IOU_POLL_REISSUE) {
- io_req_task_submit(req, locked);
+ io_req_task_submit(req, ts);
return;
} else if (ret != IOU_POLL_REMOVE_POLL_USE_RES) {
req->cqe.res = ret;
@@ -351,14 +351,14 @@ static void io_poll_task_func(struct io_kiocb *req, bool *locked)
}
io_req_set_res(req, req->cqe.res, 0);
- io_req_task_complete(req, locked);
+ io_req_task_complete(req, ts);
} else {
- io_tw_lock(req->ctx, locked);
+ io_tw_lock(req->ctx, ts);
if (ret == IOU_POLL_REMOVE_POLL_USE_RES)
- io_req_task_complete(req, locked);
+ io_req_task_complete(req, ts);
else if (ret == IOU_POLL_DONE || ret == IOU_POLL_REISSUE)
- io_req_task_submit(req, locked);
+ io_req_task_submit(req, ts);
else
io_req_defer_failed(req, ret);
}
@@ -726,6 +726,7 @@ int io_arm_poll_handler(struct io_kiocb *req, unsigned issue_flags)
apoll = io_req_alloc_apoll(req, issue_flags);
if (!apoll)
return IO_APOLL_ABORTED;
+ req->flags &= ~(REQ_F_SINGLE_POLL | REQ_F_DOUBLE_POLL);
req->flags |= REQ_F_POLLED;
ipt.pt._qproc = io_async_queue_proc;
@@ -976,7 +977,7 @@ int io_poll_remove(struct io_kiocb *req, unsigned int issue_flags)
struct io_hash_bucket *bucket;
struct io_kiocb *preq;
int ret2, ret = 0;
- bool locked;
+ struct io_tw_state ts = {};
preq = io_poll_find(ctx, true, &cd, &ctx->cancel_table, &bucket);
ret2 = io_poll_disarm(preq);
@@ -1026,8 +1027,8 @@ found:
req_set_fail(preq);
io_req_set_res(preq, -ECANCELED, 0);
- locked = !(issue_flags & IO_URING_F_UNLOCKED);
- io_req_task_complete(preq, &locked);
+ ts.locked = !(issue_flags & IO_URING_F_UNLOCKED);
+ io_req_task_complete(preq, &ts);
out:
if (ret < 0) {
req_set_fail(req);
diff --git a/io_uring/rsrc.c b/io_uring/rsrc.c
index 056f40946ff6..d4c91393e0d3 100644
--- a/io_uring/rsrc.c
+++ b/io_uring/rsrc.c
@@ -23,25 +23,16 @@ struct io_rsrc_update {
u32 offset;
};
+static void io_rsrc_buf_put(struct io_ring_ctx *ctx, struct io_rsrc_put *prsrc);
+static void io_rsrc_file_put(struct io_ring_ctx *ctx, struct io_rsrc_put *prsrc);
static int io_sqe_buffer_register(struct io_ring_ctx *ctx, struct iovec *iov,
struct io_mapped_ubuf **pimu,
struct page **last_hpage);
-#define IO_RSRC_REF_BATCH 100
-
/* only define max */
#define IORING_MAX_FIXED_FILES (1U << 20)
#define IORING_MAX_REG_BUFFERS (1U << 14)
-void io_rsrc_refs_drop(struct io_ring_ctx *ctx)
- __must_hold(&ctx->uring_lock)
-{
- if (ctx->rsrc_cached_refs) {
- io_rsrc_put_node(ctx->rsrc_node, ctx->rsrc_cached_refs);
- ctx->rsrc_cached_refs = 0;
- }
-}
-
int __io_account_mem(struct user_struct *user, unsigned long nr_pages)
{
unsigned long page_limit, cur_pages, new_pages;
@@ -151,216 +142,129 @@ static void io_buffer_unmap(struct io_ring_ctx *ctx, struct io_mapped_ubuf **slo
*slot = NULL;
}
-void io_rsrc_refs_refill(struct io_ring_ctx *ctx)
- __must_hold(&ctx->uring_lock)
-{
- ctx->rsrc_cached_refs += IO_RSRC_REF_BATCH;
- percpu_ref_get_many(&ctx->rsrc_node->refs, IO_RSRC_REF_BATCH);
-}
-
-static void __io_rsrc_put_work(struct io_rsrc_node *ref_node)
-{
- struct io_rsrc_data *rsrc_data = ref_node->rsrc_data;
- struct io_ring_ctx *ctx = rsrc_data->ctx;
- struct io_rsrc_put *prsrc, *tmp;
-
- list_for_each_entry_safe(prsrc, tmp, &ref_node->rsrc_list, list) {
- list_del(&prsrc->list);
-
- if (prsrc->tag) {
- if (ctx->flags & IORING_SETUP_IOPOLL) {
- mutex_lock(&ctx->uring_lock);
- io_post_aux_cqe(ctx, prsrc->tag, 0, 0);
- mutex_unlock(&ctx->uring_lock);
- } else {
- io_post_aux_cqe(ctx, prsrc->tag, 0, 0);
- }
- }
-
- rsrc_data->do_put(ctx, prsrc);
- kfree(prsrc);
- }
-
- io_rsrc_node_destroy(ref_node);
- if (atomic_dec_and_test(&rsrc_data->refs))
- complete(&rsrc_data->done);
-}
-
-void io_rsrc_put_work(struct work_struct *work)
+static void io_rsrc_put_work(struct io_rsrc_node *node)
{
- struct io_ring_ctx *ctx;
- struct llist_node *node;
-
- ctx = container_of(work, struct io_ring_ctx, rsrc_put_work.work);
- node = llist_del_all(&ctx->rsrc_put_llist);
+ struct io_rsrc_put *prsrc = &node->item;
- while (node) {
- struct io_rsrc_node *ref_node;
- struct llist_node *next = node->next;
+ if (prsrc->tag)
+ io_post_aux_cqe(node->ctx, prsrc->tag, 0, 0);
- ref_node = llist_entry(node, struct io_rsrc_node, llist);
- __io_rsrc_put_work(ref_node);
- node = next;
+ switch (node->type) {
+ case IORING_RSRC_FILE:
+ io_rsrc_file_put(node->ctx, prsrc);
+ break;
+ case IORING_RSRC_BUFFER:
+ io_rsrc_buf_put(node->ctx, prsrc);
+ break;
+ default:
+ WARN_ON_ONCE(1);
+ break;
}
}
-void io_rsrc_put_tw(struct callback_head *cb)
-{
- struct io_ring_ctx *ctx = container_of(cb, struct io_ring_ctx,
- rsrc_put_tw);
-
- io_rsrc_put_work(&ctx->rsrc_put_work.work);
-}
-
-void io_wait_rsrc_data(struct io_rsrc_data *data)
+void io_rsrc_node_destroy(struct io_ring_ctx *ctx, struct io_rsrc_node *node)
{
- if (data && !atomic_dec_and_test(&data->refs))
- wait_for_completion(&data->done);
+ if (!io_alloc_cache_put(&ctx->rsrc_node_cache, &node->cache))
+ kfree(node);
}
-void io_rsrc_node_destroy(struct io_rsrc_node *ref_node)
+void io_rsrc_node_ref_zero(struct io_rsrc_node *node)
+ __must_hold(&node->ctx->uring_lock)
{
- percpu_ref_exit(&ref_node->refs);
- kfree(ref_node);
-}
-
-static __cold void io_rsrc_node_ref_zero(struct percpu_ref *ref)
-{
- struct io_rsrc_node *node = container_of(ref, struct io_rsrc_node, refs);
- struct io_ring_ctx *ctx = node->rsrc_data->ctx;
- unsigned long flags;
- bool first_add = false;
- unsigned long delay = HZ;
-
- spin_lock_irqsave(&ctx->rsrc_ref_lock, flags);
- node->done = true;
-
- /* if we are mid-quiesce then do not delay */
- if (node->rsrc_data->quiesce)
- delay = 0;
+ struct io_ring_ctx *ctx = node->ctx;
while (!list_empty(&ctx->rsrc_ref_list)) {
node = list_first_entry(&ctx->rsrc_ref_list,
struct io_rsrc_node, node);
/* recycle ref nodes in order */
- if (!node->done)
+ if (node->refs)
break;
list_del(&node->node);
- first_add |= llist_add(&node->llist, &ctx->rsrc_put_llist);
- }
- spin_unlock_irqrestore(&ctx->rsrc_ref_lock, flags);
- if (!first_add)
- return;
-
- if (ctx->submitter_task) {
- if (!task_work_add(ctx->submitter_task, &ctx->rsrc_put_tw,
- ctx->notify_method))
- return;
+ if (likely(!node->empty))
+ io_rsrc_put_work(node);
+ io_rsrc_node_destroy(ctx, node);
}
- mod_delayed_work(system_wq, &ctx->rsrc_put_work, delay);
+ if (list_empty(&ctx->rsrc_ref_list) && unlikely(ctx->rsrc_quiesce))
+ wake_up_all(&ctx->rsrc_quiesce_wq);
}
-static struct io_rsrc_node *io_rsrc_node_alloc(void)
+struct io_rsrc_node *io_rsrc_node_alloc(struct io_ring_ctx *ctx)
{
struct io_rsrc_node *ref_node;
+ struct io_cache_entry *entry;
- ref_node = kzalloc(sizeof(*ref_node), GFP_KERNEL);
- if (!ref_node)
- return NULL;
-
- if (percpu_ref_init(&ref_node->refs, io_rsrc_node_ref_zero,
- 0, GFP_KERNEL)) {
- kfree(ref_node);
- return NULL;
- }
- INIT_LIST_HEAD(&ref_node->node);
- INIT_LIST_HEAD(&ref_node->rsrc_list);
- ref_node->done = false;
- return ref_node;
-}
-
-void io_rsrc_node_switch(struct io_ring_ctx *ctx,
- struct io_rsrc_data *data_to_kill)
- __must_hold(&ctx->uring_lock)
-{
- WARN_ON_ONCE(!ctx->rsrc_backup_node);
- WARN_ON_ONCE(data_to_kill && !ctx->rsrc_node);
-
- io_rsrc_refs_drop(ctx);
-
- if (data_to_kill) {
- struct io_rsrc_node *rsrc_node = ctx->rsrc_node;
-
- rsrc_node->rsrc_data = data_to_kill;
- spin_lock_irq(&ctx->rsrc_ref_lock);
- list_add_tail(&rsrc_node->node, &ctx->rsrc_ref_list);
- spin_unlock_irq(&ctx->rsrc_ref_lock);
-
- atomic_inc(&data_to_kill->refs);
- percpu_ref_kill(&rsrc_node->refs);
- ctx->rsrc_node = NULL;
- }
-
- if (!ctx->rsrc_node) {
- ctx->rsrc_node = ctx->rsrc_backup_node;
- ctx->rsrc_backup_node = NULL;
+ entry = io_alloc_cache_get(&ctx->rsrc_node_cache);
+ if (entry) {
+ ref_node = container_of(entry, struct io_rsrc_node, cache);
+ } else {
+ ref_node = kzalloc(sizeof(*ref_node), GFP_KERNEL);
+ if (!ref_node)
+ return NULL;
}
-}
-int io_rsrc_node_switch_start(struct io_ring_ctx *ctx)
-{
- if (ctx->rsrc_backup_node)
- return 0;
- ctx->rsrc_backup_node = io_rsrc_node_alloc();
- return ctx->rsrc_backup_node ? 0 : -ENOMEM;
+ ref_node->ctx = ctx;
+ ref_node->empty = 0;
+ ref_node->refs = 1;
+ return ref_node;
}
__cold static int io_rsrc_ref_quiesce(struct io_rsrc_data *data,
struct io_ring_ctx *ctx)
{
+ struct io_rsrc_node *backup;
+ DEFINE_WAIT(we);
int ret;
- /* As we may drop ->uring_lock, other task may have started quiesce */
+ /* As We may drop ->uring_lock, other task may have started quiesce */
if (data->quiesce)
return -ENXIO;
- ret = io_rsrc_node_switch_start(ctx);
- if (ret)
- return ret;
- io_rsrc_node_switch(ctx, data);
- /* kill initial ref, already quiesced if zero */
- if (atomic_dec_and_test(&data->refs))
+ backup = io_rsrc_node_alloc(ctx);
+ if (!backup)
+ return -ENOMEM;
+ ctx->rsrc_node->empty = true;
+ ctx->rsrc_node->type = -1;
+ list_add_tail(&ctx->rsrc_node->node, &ctx->rsrc_ref_list);
+ io_put_rsrc_node(ctx, ctx->rsrc_node);
+ ctx->rsrc_node = backup;
+
+ if (list_empty(&ctx->rsrc_ref_list))
return 0;
+ if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
+ atomic_set(&ctx->cq_wait_nr, 1);
+ smp_mb();
+ }
+
+ ctx->rsrc_quiesce++;
data->quiesce = true;
- mutex_unlock(&ctx->uring_lock);
do {
+ prepare_to_wait(&ctx->rsrc_quiesce_wq, &we, TASK_INTERRUPTIBLE);
+ mutex_unlock(&ctx->uring_lock);
+
ret = io_run_task_work_sig(ctx);
if (ret < 0) {
- atomic_inc(&data->refs);
- /* wait for all works potentially completing data->done */
- flush_delayed_work(&ctx->rsrc_put_work);
- reinit_completion(&data->done);
mutex_lock(&ctx->uring_lock);
+ if (list_empty(&ctx->rsrc_ref_list))
+ ret = 0;
break;
}
- flush_delayed_work(&ctx->rsrc_put_work);
- ret = wait_for_completion_interruptible(&data->done);
- if (!ret) {
- mutex_lock(&ctx->uring_lock);
- if (atomic_read(&data->refs) <= 0)
- break;
- /*
- * it has been revived by another thread while
- * we were unlocked
- */
- mutex_unlock(&ctx->uring_lock);
- }
- } while (1);
+ schedule();
+ __set_current_state(TASK_RUNNING);
+ mutex_lock(&ctx->uring_lock);
+ ret = 0;
+ } while (!list_empty(&ctx->rsrc_ref_list));
+
+ finish_wait(&ctx->rsrc_quiesce_wq, &we);
data->quiesce = false;
+ ctx->rsrc_quiesce--;
+ if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
+ atomic_set(&ctx->cq_wait_nr, 0);
+ smp_mb();
+ }
return ret;
}
@@ -405,12 +309,12 @@ static __cold void **io_alloc_page_table(size_t size)
return table;
}
-__cold static int io_rsrc_data_alloc(struct io_ring_ctx *ctx,
- rsrc_put_fn *do_put, u64 __user *utags,
+__cold static int io_rsrc_data_alloc(struct io_ring_ctx *ctx, int type,
+ u64 __user *utags,
unsigned nr, struct io_rsrc_data **pdata)
{
struct io_rsrc_data *data;
- int ret = -ENOMEM;
+ int ret = 0;
unsigned i;
data = kzalloc(sizeof(*data), GFP_KERNEL);
@@ -424,7 +328,7 @@ __cold static int io_rsrc_data_alloc(struct io_ring_ctx *ctx,
data->nr = nr;
data->ctx = ctx;
- data->do_put = do_put;
+ data->rsrc_type = type;
if (utags) {
ret = -EFAULT;
for (i = 0; i < nr; i++) {
@@ -435,9 +339,6 @@ __cold static int io_rsrc_data_alloc(struct io_ring_ctx *ctx,
goto fail;
}
}
-
- atomic_set(&data->refs, 1);
- init_completion(&data->done);
*pdata = data;
return 0;
fail:
@@ -456,7 +357,6 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx,
struct file *file;
int fd, i, err = 0;
unsigned int done;
- bool needs_switch = false;
if (!ctx->file_data)
return -ENXIO;
@@ -483,12 +383,11 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx,
if (file_slot->file_ptr) {
file = (struct file *)(file_slot->file_ptr & FFS_MASK);
- err = io_queue_rsrc_removal(data, i, ctx->rsrc_node, file);
+ err = io_queue_rsrc_removal(data, i, file);
if (err)
break;
file_slot->file_ptr = 0;
io_file_bitmap_clear(&ctx->file_table, i);
- needs_switch = true;
}
if (fd != -1) {
file = fget(fd);
@@ -519,9 +418,6 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx,
io_file_bitmap_set(&ctx->file_table, i);
}
}
-
- if (needs_switch)
- io_rsrc_node_switch(ctx, data);
return done ? done : err;
}
@@ -532,7 +428,6 @@ static int __io_sqe_buffers_update(struct io_ring_ctx *ctx,
u64 __user *tags = u64_to_user_ptr(up->tags);
struct iovec iov, __user *iovs = u64_to_user_ptr(up->data);
struct page *last_hpage = NULL;
- bool needs_switch = false;
__u32 done;
int i, err;
@@ -543,7 +438,6 @@ static int __io_sqe_buffers_update(struct io_ring_ctx *ctx,
for (done = 0; done < nr_args; done++) {
struct io_mapped_ubuf *imu;
- int offset = up->offset + done;
u64 tag = 0;
err = io_copy_iov(ctx, &iov, iovs, done);
@@ -564,24 +458,20 @@ static int __io_sqe_buffers_update(struct io_ring_ctx *ctx,
if (err)
break;
- i = array_index_nospec(offset, ctx->nr_user_bufs);
+ i = array_index_nospec(up->offset + done, ctx->nr_user_bufs);
if (ctx->user_bufs[i] != ctx->dummy_ubuf) {
err = io_queue_rsrc_removal(ctx->buf_data, i,
- ctx->rsrc_node, ctx->user_bufs[i]);
+ ctx->user_bufs[i]);
if (unlikely(err)) {
io_buffer_unmap(ctx, &imu);
break;
}
ctx->user_bufs[i] = ctx->dummy_ubuf;
- needs_switch = true;
}
ctx->user_bufs[i] = imu;
- *io_get_tag_slot(ctx->buf_data, offset) = tag;
+ *io_get_tag_slot(ctx->buf_data, i) = tag;
}
-
- if (needs_switch)
- io_rsrc_node_switch(ctx, ctx->buf_data);
return done ? done : err;
}
@@ -590,13 +480,11 @@ static int __io_register_rsrc_update(struct io_ring_ctx *ctx, unsigned type,
unsigned nr_args)
{
__u32 tmp;
- int err;
+
+ lockdep_assert_held(&ctx->uring_lock);
if (check_add_overflow(up->offset, nr_args, &tmp))
return -EOVERFLOW;
- err = io_rsrc_node_switch_start(ctx);
- if (err)
- return err;
switch (type) {
case IORING_RSRC_FILE:
@@ -753,20 +641,24 @@ int io_files_update(struct io_kiocb *req, unsigned int issue_flags)
return IOU_OK;
}
-int io_queue_rsrc_removal(struct io_rsrc_data *data, unsigned idx,
- struct io_rsrc_node *node, void *rsrc)
+int io_queue_rsrc_removal(struct io_rsrc_data *data, unsigned idx, void *rsrc)
{
+ struct io_ring_ctx *ctx = data->ctx;
+ struct io_rsrc_node *node = ctx->rsrc_node;
u64 *tag_slot = io_get_tag_slot(data, idx);
- struct io_rsrc_put *prsrc;
- prsrc = kzalloc(sizeof(*prsrc), GFP_KERNEL);
- if (!prsrc)
+ ctx->rsrc_node = io_rsrc_node_alloc(ctx);
+ if (unlikely(!ctx->rsrc_node)) {
+ ctx->rsrc_node = node;
return -ENOMEM;
+ }
- prsrc->tag = *tag_slot;
+ node->item.rsrc = rsrc;
+ node->type = data->rsrc_type;
+ node->item.tag = *tag_slot;
*tag_slot = 0;
- prsrc->rsrc = rsrc;
- list_add(&prsrc->list, &node->rsrc_list);
+ list_add_tail(&node->node, &ctx->rsrc_ref_list);
+ io_put_rsrc_node(ctx, node);
return 0;
}
@@ -794,6 +686,7 @@ void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
}
#endif
io_free_file_tables(&ctx->file_table);
+ io_file_table_set_alloc_range(ctx, 0, 0);
io_rsrc_data_free(ctx->file_data);
ctx->file_data = NULL;
ctx->nr_user_files = 0;
@@ -867,8 +760,7 @@ int __io_scm_file_account(struct io_ring_ctx *ctx, struct file *file)
UNIXCB(skb).fp = fpl;
skb->sk = sk;
- skb->scm_io_uring = 1;
- skb->destructor = unix_destruct_scm;
+ skb->destructor = io_uring_destruct_scm;
refcount_add(skb->truesize, &sk->sk_wmem_alloc);
}
@@ -881,20 +773,14 @@ int __io_scm_file_account(struct io_ring_ctx *ctx, struct file *file)
return 0;
}
-static void io_rsrc_file_put(struct io_ring_ctx *ctx, struct io_rsrc_put *prsrc)
+static __cold void io_rsrc_file_scm_put(struct io_ring_ctx *ctx, struct file *file)
{
- struct file *file = prsrc->file;
#if defined(CONFIG_UNIX)
struct sock *sock = ctx->ring_sock->sk;
struct sk_buff_head list, *head = &sock->sk_receive_queue;
struct sk_buff *skb;
int i;
- if (!io_file_need_scm(file)) {
- fput(file);
- return;
- }
-
__skb_queue_head_init(&list);
/*
@@ -944,11 +830,19 @@ static void io_rsrc_file_put(struct io_ring_ctx *ctx, struct io_rsrc_put *prsrc)
__skb_queue_tail(head, skb);
spin_unlock_irq(&head->lock);
}
-#else
- fput(file);
#endif
}
+static void io_rsrc_file_put(struct io_ring_ctx *ctx, struct io_rsrc_put *prsrc)
+{
+ struct file *file = prsrc->file;
+
+ if (likely(!io_file_need_scm(file)))
+ fput(file);
+ else
+ io_rsrc_file_scm_put(ctx, file);
+}
+
int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
unsigned nr_args, u64 __user *tags)
{
@@ -965,10 +859,7 @@ int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
return -EMFILE;
if (nr_args > rlimit(RLIMIT_NOFILE))
return -EMFILE;
- ret = io_rsrc_node_switch_start(ctx);
- if (ret)
- return ret;
- ret = io_rsrc_data_alloc(ctx, io_rsrc_file_put, tags, nr_args,
+ ret = io_rsrc_data_alloc(ctx, IORING_RSRC_FILE, tags, nr_args,
&ctx->file_data);
if (ret)
return ret;
@@ -1022,7 +913,6 @@ int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
/* default it to the whole table */
io_file_table_set_alloc_range(ctx, 0, ctx->nr_user_files);
- io_rsrc_node_switch(ctx, NULL);
return 0;
fail:
__io_sqe_files_unregister(ctx);
@@ -1162,17 +1052,14 @@ struct page **io_pin_pages(unsigned long ubuf, unsigned long len, int *npages)
pret = pin_user_pages(ubuf, nr_pages, FOLL_WRITE | FOLL_LONGTERM,
pages, vmas);
if (pret == nr_pages) {
- struct file *file = vmas[0]->vm_file;
-
/* don't support file backed memory */
for (i = 0; i < nr_pages; i++) {
- if (vmas[i]->vm_file != file) {
- ret = -EINVAL;
- break;
- }
- if (!file)
+ struct vm_area_struct *vma = vmas[i];
+
+ if (vma_is_shmem(vma))
continue;
- if (!vma_is_shmem(vmas[i]) && !is_file_hugepages(file)) {
+ if (vma->vm_file &&
+ !is_file_hugepages(vma->vm_file)) {
ret = -EOPNOTSUPP;
break;
}
@@ -1235,7 +1122,13 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, struct iovec *iov,
}
}
if (folio) {
- folio_put_refs(folio, nr_pages - 1);
+ /*
+ * The pages are bound to the folio, it doesn't
+ * actually unpin them but drops all but one reference,
+ * which is usually put down by io_buffer_unmap().
+ * Note, needs a better helper.
+ */
+ unpin_user_pages(&pages[1], nr_pages - 1);
nr_pages = 1;
}
}
@@ -1298,10 +1191,7 @@ int io_sqe_buffers_register(struct io_ring_ctx *ctx, void __user *arg,
return -EBUSY;
if (!nr_args || nr_args > IORING_MAX_REG_BUFFERS)
return -EINVAL;
- ret = io_rsrc_node_switch_start(ctx);
- if (ret)
- return ret;
- ret = io_rsrc_data_alloc(ctx, io_rsrc_buf_put, tags, nr_args, &data);
+ ret = io_rsrc_data_alloc(ctx, IORING_RSRC_BUFFER, tags, nr_args, &data);
if (ret)
return ret;
ret = io_buffers_map_alloc(ctx, nr_args);
@@ -1338,8 +1228,6 @@ int io_sqe_buffers_register(struct io_ring_ctx *ctx, void __user *arg,
ctx->buf_data = data;
if (ret)
__io_sqe_buffers_unregister(ctx);
- else
- io_rsrc_node_switch(ctx, NULL);
return ret;
}
diff --git a/io_uring/rsrc.h b/io_uring/rsrc.h
index 2b8743645efc..0a8a95e9b99e 100644
--- a/io_uring/rsrc.h
+++ b/io_uring/rsrc.h
@@ -4,6 +4,10 @@
#include <net/af_unix.h>
+#include "alloc_cache.h"
+
+#define IO_NODE_ALLOC_CACHE_MAX 32
+
#define IO_RSRC_TAG_TABLE_SHIFT (PAGE_SHIFT - 3)
#define IO_RSRC_TAG_TABLE_MAX (1U << IO_RSRC_TAG_TABLE_SHIFT)
#define IO_RSRC_TAG_TABLE_MASK (IO_RSRC_TAG_TABLE_MAX - 1)
@@ -14,7 +18,6 @@ enum {
};
struct io_rsrc_put {
- struct list_head list;
u64 tag;
union {
void *rsrc;
@@ -30,19 +33,20 @@ struct io_rsrc_data {
u64 **tags;
unsigned int nr;
- rsrc_put_fn *do_put;
- atomic_t refs;
- struct completion done;
+ u16 rsrc_type;
bool quiesce;
};
struct io_rsrc_node {
- struct percpu_ref refs;
+ union {
+ struct io_cache_entry cache;
+ struct io_ring_ctx *ctx;
+ };
+ int refs;
+ bool empty;
+ u16 type;
struct list_head node;
- struct list_head rsrc_list;
- struct io_rsrc_data *rsrc_data;
- struct llist_node llist;
- bool done;
+ struct io_rsrc_put item;
};
struct io_mapped_ubuf {
@@ -54,16 +58,10 @@ struct io_mapped_ubuf {
};
void io_rsrc_put_tw(struct callback_head *cb);
-void io_rsrc_put_work(struct work_struct *work);
-void io_rsrc_refs_refill(struct io_ring_ctx *ctx);
-void io_wait_rsrc_data(struct io_rsrc_data *data);
-void io_rsrc_node_destroy(struct io_rsrc_node *ref_node);
-void io_rsrc_refs_drop(struct io_ring_ctx *ctx);
-int io_rsrc_node_switch_start(struct io_ring_ctx *ctx);
-int io_queue_rsrc_removal(struct io_rsrc_data *data, unsigned idx,
- struct io_rsrc_node *node, void *rsrc);
-void io_rsrc_node_switch(struct io_ring_ctx *ctx,
- struct io_rsrc_data *data_to_kill);
+void io_rsrc_node_ref_zero(struct io_rsrc_node *node);
+void io_rsrc_node_destroy(struct io_ring_ctx *ctx, struct io_rsrc_node *ref_node);
+struct io_rsrc_node *io_rsrc_node_alloc(struct io_ring_ctx *ctx);
+int io_queue_rsrc_removal(struct io_rsrc_data *data, unsigned idx, void *rsrc);
int io_import_fixed(int ddir, struct iov_iter *iter,
struct io_mapped_ubuf *imu,
@@ -107,36 +105,24 @@ int io_register_rsrc_update(struct io_ring_ctx *ctx, void __user *arg,
int io_register_rsrc(struct io_ring_ctx *ctx, void __user *arg,
unsigned int size, unsigned int type);
-static inline void io_rsrc_put_node(struct io_rsrc_node *node, int nr)
+static inline void io_put_rsrc_node(struct io_ring_ctx *ctx, struct io_rsrc_node *node)
{
- percpu_ref_put_many(&node->refs, nr);
-}
+ lockdep_assert_held(&ctx->uring_lock);
-static inline void io_req_put_rsrc(struct io_kiocb *req)
-{
- if (req->rsrc_node)
- io_rsrc_put_node(req->rsrc_node, 1);
+ if (node && !--node->refs)
+ io_rsrc_node_ref_zero(node);
}
static inline void io_req_put_rsrc_locked(struct io_kiocb *req,
struct io_ring_ctx *ctx)
- __must_hold(&ctx->uring_lock)
{
- struct io_rsrc_node *node = req->rsrc_node;
-
- if (node) {
- if (node == ctx->rsrc_node)
- ctx->rsrc_cached_refs++;
- else
- io_rsrc_put_node(node, 1);
- }
+ io_put_rsrc_node(ctx, req->rsrc_node);
}
-static inline void io_charge_rsrc_node(struct io_ring_ctx *ctx)
+static inline void io_charge_rsrc_node(struct io_ring_ctx *ctx,
+ struct io_rsrc_node *node)
{
- ctx->rsrc_cached_refs--;
- if (unlikely(ctx->rsrc_cached_refs < 0))
- io_rsrc_refs_refill(ctx);
+ node->refs++;
}
static inline void io_req_set_rsrc_node(struct io_kiocb *req,
@@ -144,15 +130,13 @@ static inline void io_req_set_rsrc_node(struct io_kiocb *req,
unsigned int issue_flags)
{
if (!req->rsrc_node) {
- req->rsrc_node = ctx->rsrc_node;
+ io_ring_submit_lock(ctx, issue_flags);
- if (!(issue_flags & IO_URING_F_UNLOCKED)) {
- lockdep_assert_held(&ctx->uring_lock);
+ lockdep_assert_held(&ctx->uring_lock);
- io_charge_rsrc_node(ctx);
- } else {
- percpu_ref_get(&req->rsrc_node->refs);
- }
+ req->rsrc_node = ctx->rsrc_node;
+ io_charge_rsrc_node(ctx, ctx->rsrc_node);
+ io_ring_submit_unlock(ctx, issue_flags);
}
}
@@ -164,6 +148,12 @@ static inline u64 *io_get_tag_slot(struct io_rsrc_data *data, unsigned int idx)
return &data->tags[table_idx][off];
}
+static inline int io_rsrc_init(struct io_ring_ctx *ctx)
+{
+ ctx->rsrc_node = io_rsrc_node_alloc(ctx);
+ return ctx->rsrc_node ? 0 : -ENOMEM;
+}
+
int io_files_update(struct io_kiocb *req, unsigned int issue_flags);
int io_files_update_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
diff --git a/io_uring/rw.c b/io_uring/rw.c
index 4c233910e200..3f118ed46e4f 100644
--- a/io_uring/rw.c
+++ b/io_uring/rw.c
@@ -283,16 +283,16 @@ static inline int io_fixup_rw_res(struct io_kiocb *req, long res)
return res;
}
-static void io_req_rw_complete(struct io_kiocb *req, bool *locked)
+static void io_req_rw_complete(struct io_kiocb *req, struct io_tw_state *ts)
{
io_req_io_end(req);
if (req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)) {
- unsigned issue_flags = *locked ? 0 : IO_URING_F_UNLOCKED;
+ unsigned issue_flags = ts->locked ? 0 : IO_URING_F_UNLOCKED;
req->cqe.flags |= io_put_kbuf(req, issue_flags);
}
- io_req_task_complete(req, locked);
+ io_req_task_complete(req, ts);
}
static void io_complete_rw(struct kiocb *kiocb, long res)
@@ -304,7 +304,7 @@ static void io_complete_rw(struct kiocb *kiocb, long res)
return;
io_req_set_res(req, io_fixup_rw_res(req, res), 0);
req->io_task_work.func = io_req_rw_complete;
- io_req_task_work_add(req);
+ __io_req_task_work_add(req, IOU_F_TWQ_LAZY_WAKE);
}
static void io_complete_rw_iopoll(struct kiocb *kiocb, long res)
@@ -447,26 +447,25 @@ static ssize_t loop_rw_iter(int ddir, struct io_rw *rw, struct iov_iter *iter)
ppos = io_kiocb_ppos(kiocb);
while (iov_iter_count(iter)) {
- struct iovec iovec;
+ void __user *addr;
+ size_t len;
ssize_t nr;
if (iter_is_ubuf(iter)) {
- iovec.iov_base = iter->ubuf + iter->iov_offset;
- iovec.iov_len = iov_iter_count(iter);
+ addr = iter->ubuf + iter->iov_offset;
+ len = iov_iter_count(iter);
} else if (!iov_iter_is_bvec(iter)) {
- iovec = iov_iter_iovec(iter);
+ addr = iter_iov_addr(iter);
+ len = iter_iov_len(iter);
} else {
- iovec.iov_base = u64_to_user_ptr(rw->addr);
- iovec.iov_len = rw->len;
+ addr = u64_to_user_ptr(rw->addr);
+ len = rw->len;
}
- if (ddir == READ) {
- nr = file->f_op->read(file, iovec.iov_base,
- iovec.iov_len, ppos);
- } else {
- nr = file->f_op->write(file, iovec.iov_base,
- iovec.iov_len, ppos);
- }
+ if (ddir == READ)
+ nr = file->f_op->read(file, addr, len, ppos);
+ else
+ nr = file->f_op->write(file, addr, len, ppos);
if (nr < 0) {
if (!ret)
@@ -482,7 +481,7 @@ static ssize_t loop_rw_iter(int ddir, struct io_rw *rw, struct iov_iter *iter)
if (!rw->len)
break;
}
- if (nr != iovec.iov_len)
+ if (nr != len)
break;
}
@@ -503,10 +502,10 @@ static void io_req_map_rw(struct io_kiocb *req, const struct iovec *iovec,
if (!iovec) {
unsigned iov_off = 0;
- io->s.iter.iov = io->s.fast_iov;
- if (iter->iov != fast_iov) {
- iov_off = iter->iov - fast_iov;
- io->s.iter.iov += iov_off;
+ io->s.iter.__iov = io->s.fast_iov;
+ if (iter->__iov != fast_iov) {
+ iov_off = iter_iov(iter) - fast_iov;
+ io->s.iter.__iov += iov_off;
}
if (io->s.fast_iov != fast_iov)
memcpy(io->s.fast_iov + iov_off, fast_iov + iov_off,
@@ -1002,7 +1001,7 @@ void io_rw_fail(struct io_kiocb *req)
int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin)
{
struct io_wq_work_node *pos, *start, *prev;
- unsigned int poll_flags = BLK_POLL_NOSLEEP;
+ unsigned int poll_flags = 0;
DEFINE_IO_COMP_BATCH(iob);
int nr_events = 0;
diff --git a/io_uring/slist.h b/io_uring/slist.h
index 7c198a40d5f1..0eb194817242 100644
--- a/io_uring/slist.h
+++ b/io_uring/slist.h
@@ -3,6 +3,9 @@
#include <linux/io_uring_types.h>
+#define __wq_list_for_each(pos, head) \
+ for (pos = (head)->first; pos; pos = (pos)->next)
+
#define wq_list_for_each(pos, prv, head) \
for (pos = (head)->first, prv = NULL; pos; prv = pos, pos = (pos)->next)
@@ -113,4 +116,4 @@ static inline struct io_wq_work *wq_next_work(struct io_wq_work *work)
return container_of(work->list.next, struct io_wq_work, list);
}
-#endif // INTERNAL_IO_SLIST_H \ No newline at end of file
+#endif // INTERNAL_IO_SLIST_H
diff --git a/io_uring/sqpoll.c b/io_uring/sqpoll.c
index 0119d3f1a556..9db4bc1f521a 100644
--- a/io_uring/sqpoll.c
+++ b/io_uring/sqpoll.c
@@ -233,7 +233,6 @@ static int io_sq_thread(void *data)
set_cpus_allowed_ptr(current, cpumask_of(sqd->sq_cpu));
else
set_cpus_allowed_ptr(current, cpu_online_mask);
- current->flags |= PF_NO_SETAFFINITY;
mutex_lock(&sqd->lock);
while (1) {
diff --git a/io_uring/timeout.c b/io_uring/timeout.c
index 826a51bca3e4..fc950177e2e1 100644
--- a/io_uring/timeout.c
+++ b/io_uring/timeout.c
@@ -17,6 +17,7 @@ struct io_timeout {
struct file *file;
u32 off;
u32 target_seq;
+ u32 repeats;
struct list_head list;
/* head of the link, used by linked timeouts only */
struct io_kiocb *head;
@@ -37,8 +38,9 @@ struct io_timeout_rem {
static inline bool io_is_timeout_noseq(struct io_kiocb *req)
{
struct io_timeout *timeout = io_kiocb_to_cmd(req, struct io_timeout);
+ struct io_timeout_data *data = req->async_data;
- return !timeout->off;
+ return !timeout->off || data->flags & IORING_TIMEOUT_MULTISHOT;
}
static inline void io_put_req(struct io_kiocb *req)
@@ -49,6 +51,44 @@ static inline void io_put_req(struct io_kiocb *req)
}
}
+static inline bool io_timeout_finish(struct io_timeout *timeout,
+ struct io_timeout_data *data)
+{
+ if (!(data->flags & IORING_TIMEOUT_MULTISHOT))
+ return true;
+
+ if (!timeout->off || (timeout->repeats && --timeout->repeats))
+ return false;
+
+ return true;
+}
+
+static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer);
+
+static void io_timeout_complete(struct io_kiocb *req, struct io_tw_state *ts)
+{
+ struct io_timeout *timeout = io_kiocb_to_cmd(req, struct io_timeout);
+ struct io_timeout_data *data = req->async_data;
+ struct io_ring_ctx *ctx = req->ctx;
+
+ if (!io_timeout_finish(timeout, data)) {
+ bool filled;
+ filled = io_aux_cqe(ctx, ts->locked, req->cqe.user_data, -ETIME,
+ IORING_CQE_F_MORE, false);
+ if (filled) {
+ /* re-arm timer */
+ spin_lock_irq(&ctx->timeout_lock);
+ list_add(&timeout->list, ctx->timeout_list.prev);
+ data->timer.function = io_timeout_fn;
+ hrtimer_start(&data->timer, timespec64_to_ktime(data->ts), data->mode);
+ spin_unlock_irq(&ctx->timeout_lock);
+ return;
+ }
+ }
+
+ io_req_task_complete(req, ts);
+}
+
static bool io_kill_timeout(struct io_kiocb *req, int status)
__must_hold(&req->ctx->timeout_lock)
{
@@ -101,9 +141,9 @@ __cold void io_flush_timeouts(struct io_ring_ctx *ctx)
spin_unlock_irq(&ctx->timeout_lock);
}
-static void io_req_tw_fail_links(struct io_kiocb *link, bool *locked)
+static void io_req_tw_fail_links(struct io_kiocb *link, struct io_tw_state *ts)
{
- io_tw_lock(link->ctx, locked);
+ io_tw_lock(link->ctx, ts);
while (link) {
struct io_kiocb *nxt = link->link;
long res = -ECANCELED;
@@ -112,7 +152,7 @@ static void io_req_tw_fail_links(struct io_kiocb *link, bool *locked)
res = link->cqe.res;
link->link = NULL;
io_req_set_res(link, res, 0);
- io_req_task_complete(link, locked);
+ io_req_task_complete(link, ts);
link = nxt;
}
}
@@ -212,7 +252,7 @@ static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
req_set_fail(req);
io_req_set_res(req, -ETIME, 0);
- req->io_task_work.func = io_req_task_complete;
+ req->io_task_work.func = io_timeout_complete;
io_req_task_work_add(req);
return HRTIMER_NORESTART;
}
@@ -265,9 +305,9 @@ int io_timeout_cancel(struct io_ring_ctx *ctx, struct io_cancel_data *cd)
return 0;
}
-static void io_req_task_link_timeout(struct io_kiocb *req, bool *locked)
+static void io_req_task_link_timeout(struct io_kiocb *req, struct io_tw_state *ts)
{
- unsigned issue_flags = *locked ? 0 : IO_URING_F_UNLOCKED;
+ unsigned issue_flags = ts->locked ? 0 : IO_URING_F_UNLOCKED;
struct io_timeout *timeout = io_kiocb_to_cmd(req, struct io_timeout);
struct io_kiocb *prev = timeout->prev;
int ret = -ENOENT;
@@ -282,11 +322,11 @@ static void io_req_task_link_timeout(struct io_kiocb *req, bool *locked)
ret = io_try_cancel(req->task->io_uring, &cd, issue_flags);
}
io_req_set_res(req, ret ?: -ETIME, 0);
- io_req_task_complete(req, locked);
+ io_req_task_complete(req, ts);
io_put_req(prev);
} else {
io_req_set_res(req, -ETIME, 0);
- io_req_task_complete(req, locked);
+ io_req_task_complete(req, ts);
}
}
@@ -470,16 +510,27 @@ static int __io_timeout_prep(struct io_kiocb *req,
return -EINVAL;
flags = READ_ONCE(sqe->timeout_flags);
if (flags & ~(IORING_TIMEOUT_ABS | IORING_TIMEOUT_CLOCK_MASK |
- IORING_TIMEOUT_ETIME_SUCCESS))
+ IORING_TIMEOUT_ETIME_SUCCESS |
+ IORING_TIMEOUT_MULTISHOT))
return -EINVAL;
/* more than one clock specified is invalid, obviously */
if (hweight32(flags & IORING_TIMEOUT_CLOCK_MASK) > 1)
return -EINVAL;
+ /* multishot requests only make sense with rel values */
+ if (!(~flags & (IORING_TIMEOUT_MULTISHOT | IORING_TIMEOUT_ABS)))
+ return -EINVAL;
INIT_LIST_HEAD(&timeout->list);
timeout->off = off;
if (unlikely(off && !req->ctx->off_timeout_used))
req->ctx->off_timeout_used = true;
+ /*
+ * for multishot reqs w/ fixed nr of repeats, repeats tracks the
+ * remaining nr
+ */
+ timeout->repeats = 0;
+ if ((flags & IORING_TIMEOUT_MULTISHOT) && off > 0)
+ timeout->repeats = off;
if (WARN_ON_ONCE(req_has_async_data(req)))
return -EFAULT;
diff --git a/io_uring/uring_cmd.c b/io_uring/uring_cmd.c
index 446a189b78b0..5113c9a48583 100644
--- a/io_uring/uring_cmd.c
+++ b/io_uring/uring_cmd.c
@@ -12,15 +12,16 @@
#include "rsrc.h"
#include "uring_cmd.h"
-static void io_uring_cmd_work(struct io_kiocb *req, bool *locked)
+static void io_uring_cmd_work(struct io_kiocb *req, struct io_tw_state *ts)
{
struct io_uring_cmd *ioucmd = io_kiocb_to_cmd(req, struct io_uring_cmd);
+ unsigned issue_flags = ts->locked ? 0 : IO_URING_F_UNLOCKED;
- ioucmd->task_work_cb(ioucmd);
+ ioucmd->task_work_cb(ioucmd, issue_flags);
}
void io_uring_cmd_complete_in_task(struct io_uring_cmd *ioucmd,
- void (*task_work_cb)(struct io_uring_cmd *))
+ void (*task_work_cb)(struct io_uring_cmd *, unsigned))
{
struct io_kiocb *req = cmd_to_io_kiocb(ioucmd);
@@ -42,7 +43,8 @@ static inline void io_req_set_cqe32_extra(struct io_kiocb *req,
* Called by consumers of io_uring_cmd, if they originally returned
* -EIOCBQUEUED upon receiving the command.
*/
-void io_uring_cmd_done(struct io_uring_cmd *ioucmd, ssize_t ret, ssize_t res2)
+void io_uring_cmd_done(struct io_uring_cmd *ioucmd, ssize_t ret, ssize_t res2,
+ unsigned issue_flags)
{
struct io_kiocb *req = cmd_to_io_kiocb(ioucmd);
@@ -52,11 +54,15 @@ void io_uring_cmd_done(struct io_uring_cmd *ioucmd, ssize_t ret, ssize_t res2)
io_req_set_res(req, ret, 0);
if (req->ctx->flags & IORING_SETUP_CQE32)
io_req_set_cqe32_extra(req, res2, 0);
- if (req->ctx->flags & IORING_SETUP_IOPOLL)
+ if (req->ctx->flags & IORING_SETUP_IOPOLL) {
/* order with io_iopoll_req_issued() checking ->iopoll_complete */
smp_store_release(&req->iopoll_completed, 1);
- else
- io_req_complete_post(req, 0);
+ } else {
+ struct io_tw_state ts = {
+ .locked = !(issue_flags & IO_URING_F_UNLOCKED),
+ };
+ io_req_task_complete(req, &ts);
+ }
}
EXPORT_SYMBOL_GPL(io_uring_cmd_done);
@@ -71,6 +77,7 @@ int io_uring_cmd_prep_async(struct io_kiocb *req)
cmd_size = uring_cmd_pdu_size(req->ctx->flags & IORING_SETUP_SQE128);
memcpy(req->async_data, ioucmd->cmd, cmd_size);
+ ioucmd->cmd = req->async_data;
return 0;
}
@@ -108,7 +115,7 @@ int io_uring_cmd(struct io_kiocb *req, unsigned int issue_flags)
struct file *file = req->file;
int ret;
- if (!req->file->f_op->uring_cmd)
+ if (!file->f_op->uring_cmd)
return -EOPNOTSUPP;
ret = security_uring_cmd(ioucmd);
@@ -120,14 +127,13 @@ int io_uring_cmd(struct io_kiocb *req, unsigned int issue_flags)
if (ctx->flags & IORING_SETUP_CQE32)
issue_flags |= IO_URING_F_CQE32;
if (ctx->flags & IORING_SETUP_IOPOLL) {
+ if (!file->f_op->uring_cmd_iopoll)
+ return -EOPNOTSUPP;
issue_flags |= IO_URING_F_IOPOLL;
req->iopoll_completed = 0;
WRITE_ONCE(ioucmd->cookie, NULL);
}
- if (req_has_async_data(req))
- ioucmd->cmd = req->async_data;
-
ret = file->f_op->uring_cmd(ioucmd, issue_flags);
if (ret == -EAGAIN) {
if (!req_has_async_data(req)) {