diff options
Diffstat (limited to 'io_uring/io-wq.c')
-rw-r--r-- | io_uring/io-wq.c | 259 |
1 files changed, 153 insertions, 106 deletions
diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c index a38f36b68060..0825c1b30f8f 100644 --- a/io_uring/io-wq.c +++ b/io_uring/io-wq.c @@ -30,7 +30,6 @@ enum { IO_WORKER_F_UP = 0, /* up and active */ IO_WORKER_F_RUNNING = 1, /* account as running */ IO_WORKER_F_FREE = 2, /* worker on free list */ - IO_WORKER_F_BOUND = 3, /* is doing bounded work */ }; enum { @@ -46,12 +45,12 @@ enum { */ struct io_worker { refcount_t ref; - int create_index; unsigned long flags; struct hlist_nulls_node nulls_node; struct list_head all_list; struct task_struct *task; struct io_wq *wq; + struct io_wq_acct *acct; struct io_wq_work *cur_work; raw_spinlock_t lock; @@ -64,7 +63,7 @@ struct io_worker { union { struct rcu_head rcu; - struct work_struct work; + struct delayed_work work; }; }; @@ -77,10 +76,27 @@ struct io_worker { #define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER) struct io_wq_acct { + /** + * Protects access to the worker lists. + */ + raw_spinlock_t workers_lock; + unsigned nr_workers; unsigned max_workers; - int index; atomic_t nr_running; + + /** + * The list of free workers. Protected by #workers_lock + * (write) and RCU (read). + */ + struct hlist_nulls_head free_list; + + /** + * The list of all workers. Protected by #workers_lock + * (write) and RCU (read). + */ + struct list_head all_list; + raw_spinlock_t lock; struct io_wq_work_list work_list; unsigned long flags; @@ -112,12 +128,6 @@ struct io_wq { struct io_wq_acct acct[IO_WQ_ACCT_NR]; - /* lock protects access to elements below */ - raw_spinlock_t lock; - - struct hlist_nulls_head free_list; - struct list_head all_list; - struct wait_queue_entry wait; struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; @@ -135,7 +145,7 @@ struct io_cb_cancel_data { bool cancel_all; }; -static bool create_io_worker(struct io_wq *wq, int index); +static bool create_io_worker(struct io_wq *wq, struct io_wq_acct *acct); static void io_wq_dec_running(struct io_worker *worker); static bool io_acct_cancel_pending_work(struct io_wq *wq, struct io_wq_acct *acct, @@ -160,14 +170,14 @@ static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound) } static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq, - struct io_wq_work *work) + unsigned int work_flags) { - return io_get_acct(wq, !(atomic_read(&work->flags) & IO_WQ_WORK_UNBOUND)); + return io_get_acct(wq, !(work_flags & IO_WQ_WORK_UNBOUND)); } static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker) { - return io_get_acct(worker->wq, test_bit(IO_WORKER_F_BOUND, &worker->flags)); + return worker->acct; } static void io_worker_ref_put(struct io_wq *wq) @@ -192,9 +202,9 @@ static void io_worker_cancel_cb(struct io_worker *worker) struct io_wq *wq = worker->wq; atomic_dec(&acct->nr_running); - raw_spin_lock(&wq->lock); + raw_spin_lock(&acct->workers_lock); acct->nr_workers--; - raw_spin_unlock(&wq->lock); + raw_spin_unlock(&acct->workers_lock); io_worker_ref_put(wq); clear_bit_unlock(0, &worker->create_state); io_worker_release(worker); @@ -213,6 +223,7 @@ static bool io_task_worker_match(struct callback_head *cb, void *data) static void io_worker_exit(struct io_worker *worker) { struct io_wq *wq = worker->wq; + struct io_wq_acct *acct = io_wq_get_acct(worker); while (1) { struct callback_head *cb = task_work_cancel_match(wq->task, @@ -226,11 +237,11 @@ static void io_worker_exit(struct io_worker *worker) io_worker_release(worker); wait_for_completion(&worker->ref_done); - raw_spin_lock(&wq->lock); + raw_spin_lock(&acct->workers_lock); if (test_bit(IO_WORKER_F_FREE, &worker->flags)) hlist_nulls_del_rcu(&worker->nulls_node); list_del_rcu(&worker->all_list); - raw_spin_unlock(&wq->lock); + raw_spin_unlock(&acct->workers_lock); io_wq_dec_running(worker); /* * this worker is a goner, clear ->worker_private to avoid any @@ -269,8 +280,7 @@ static inline bool io_acct_run_queue(struct io_wq_acct *acct) * Check head of free list for an available worker. If one isn't available, * caller must create one. */ -static bool io_wq_activate_free_worker(struct io_wq *wq, - struct io_wq_acct *acct) +static bool io_acct_activate_free_worker(struct io_wq_acct *acct) __must_hold(RCU) { struct hlist_nulls_node *n; @@ -281,13 +291,9 @@ static bool io_wq_activate_free_worker(struct io_wq *wq, * activate. If a given worker is on the free_list but in the process * of exiting, keep trying. */ - hlist_nulls_for_each_entry_rcu(worker, n, &wq->free_list, nulls_node) { + hlist_nulls_for_each_entry_rcu(worker, n, &acct->free_list, nulls_node) { if (!io_worker_get(worker)) continue; - if (io_wq_get_acct(worker) != acct) { - io_worker_release(worker); - continue; - } /* * If the worker is already running, it's either already * starting work or finishing work. In either case, if it does @@ -314,16 +320,16 @@ static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct) if (unlikely(!acct->max_workers)) pr_warn_once("io-wq is not configured for unbound workers"); - raw_spin_lock(&wq->lock); + raw_spin_lock(&acct->workers_lock); if (acct->nr_workers >= acct->max_workers) { - raw_spin_unlock(&wq->lock); + raw_spin_unlock(&acct->workers_lock); return true; } acct->nr_workers++; - raw_spin_unlock(&wq->lock); + raw_spin_unlock(&acct->workers_lock); atomic_inc(&acct->nr_running); atomic_inc(&wq->worker_refs); - return create_io_worker(wq, acct->index); + return create_io_worker(wq, acct); } static void io_wq_inc_running(struct io_worker *worker) @@ -343,16 +349,16 @@ static void create_worker_cb(struct callback_head *cb) worker = container_of(cb, struct io_worker, create_work); wq = worker->wq; - acct = &wq->acct[worker->create_index]; - raw_spin_lock(&wq->lock); + acct = worker->acct; + raw_spin_lock(&acct->workers_lock); if (acct->nr_workers < acct->max_workers) { acct->nr_workers++; do_create = true; } - raw_spin_unlock(&wq->lock); + raw_spin_unlock(&acct->workers_lock); if (do_create) { - create_io_worker(wq, worker->create_index); + create_io_worker(wq, acct); } else { atomic_dec(&acct->nr_running); io_worker_ref_put(wq); @@ -384,7 +390,6 @@ static bool io_queue_worker_create(struct io_worker *worker, atomic_inc(&wq->worker_refs); init_task_work(&worker->create_work, func); - worker->create_index = acct->index; if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) { /* * EXIT may have been set after checking it above, check after @@ -430,31 +435,36 @@ static void io_wq_dec_running(struct io_worker *worker) * Worker will start processing some work. Move it to the busy list, if * it's currently on the freelist */ -static void __io_worker_busy(struct io_wq *wq, struct io_worker *worker) +static void __io_worker_busy(struct io_wq_acct *acct, struct io_worker *worker) { if (test_bit(IO_WORKER_F_FREE, &worker->flags)) { clear_bit(IO_WORKER_F_FREE, &worker->flags); - raw_spin_lock(&wq->lock); + raw_spin_lock(&acct->workers_lock); hlist_nulls_del_init_rcu(&worker->nulls_node); - raw_spin_unlock(&wq->lock); + raw_spin_unlock(&acct->workers_lock); } } /* * No work, worker going to sleep. Move to freelist. */ -static void __io_worker_idle(struct io_wq *wq, struct io_worker *worker) - __must_hold(wq->lock) +static void __io_worker_idle(struct io_wq_acct *acct, struct io_worker *worker) + __must_hold(acct->workers_lock) { if (!test_bit(IO_WORKER_F_FREE, &worker->flags)) { set_bit(IO_WORKER_F_FREE, &worker->flags); - hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list); + hlist_nulls_add_head_rcu(&worker->nulls_node, &acct->free_list); } } +static inline unsigned int __io_get_work_hash(unsigned int work_flags) +{ + return work_flags >> IO_WQ_HASH_SHIFT; +} + static inline unsigned int io_get_work_hash(struct io_wq_work *work) { - return atomic_read(&work->flags) >> IO_WQ_HASH_SHIFT; + return __io_get_work_hash(atomic_read(&work->flags)); } static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash) @@ -475,26 +485,27 @@ static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash) } static struct io_wq_work *io_get_next_work(struct io_wq_acct *acct, - struct io_worker *worker) + struct io_wq *wq) __must_hold(acct->lock) { struct io_wq_work_node *node, *prev; struct io_wq_work *work, *tail; unsigned int stall_hash = -1U; - struct io_wq *wq = worker->wq; wq_list_for_each(node, prev, &acct->work_list) { + unsigned int work_flags; unsigned int hash; work = container_of(node, struct io_wq_work, list); /* not hashed, can run anytime */ - if (!io_wq_is_hashed(work)) { + work_flags = atomic_read(&work->flags); + if (!__io_wq_is_hashed(work_flags)) { wq_list_del(&acct->work_list, node, prev); return work; } - hash = io_get_work_hash(work); + hash = __io_get_work_hash(work_flags); /* all items with this hash lie in [work, tail] */ tail = wq->hash_tail[hash]; @@ -564,7 +575,7 @@ static void io_worker_handle_work(struct io_wq_acct *acct, * can't make progress, any work completion or insertion will * clear the stalled flag. */ - work = io_get_next_work(acct, worker); + work = io_get_next_work(acct, wq); if (work) { /* * Make sure cancelation can find this, even before @@ -583,7 +594,7 @@ static void io_worker_handle_work(struct io_wq_acct *acct, if (!work) break; - __io_worker_busy(wq, worker); + __io_worker_busy(acct, worker); io_assign_current_work(worker, work); __set_current_state(TASK_RUNNING); @@ -591,12 +602,15 @@ static void io_worker_handle_work(struct io_wq_acct *acct, /* handle a whole dependent link */ do { struct io_wq_work *next_hashed, *linked; - unsigned int hash = io_get_work_hash(work); + unsigned int work_flags = atomic_read(&work->flags); + unsigned int hash = __io_wq_is_hashed(work_flags) + ? __io_get_work_hash(work_flags) + : -1U; next_hashed = wq_next_work(work); if (do_kill && - (atomic_read(&work->flags) & IO_WQ_WORK_UNBOUND)) + (work_flags & IO_WQ_WORK_UNBOUND)) atomic_or(IO_WQ_WORK_CANCEL, &work->flags); wq->do_work(work); io_assign_current_work(worker, NULL); @@ -634,7 +648,7 @@ static int io_wq_worker(void *data) struct io_wq_acct *acct = io_wq_get_acct(worker); struct io_wq *wq = worker->wq; bool exit_mask = false, last_timeout = false; - char buf[TASK_COMM_LEN]; + char buf[TASK_COMM_LEN] = {}; set_mask_bits(&worker->flags, 0, BIT(IO_WORKER_F_UP) | BIT(IO_WORKER_F_RUNNING)); @@ -654,20 +668,20 @@ static int io_wq_worker(void *data) while (io_acct_run_queue(acct)) io_worker_handle_work(acct, worker); - raw_spin_lock(&wq->lock); + raw_spin_lock(&acct->workers_lock); /* * Last sleep timed out. Exit if we're not the last worker, * or if someone modified our affinity. */ if (last_timeout && (exit_mask || acct->nr_workers > 1)) { acct->nr_workers--; - raw_spin_unlock(&wq->lock); + raw_spin_unlock(&acct->workers_lock); __set_current_state(TASK_RUNNING); break; } last_timeout = false; - __io_worker_idle(wq, worker); - raw_spin_unlock(&wq->lock); + __io_worker_idle(acct, worker); + raw_spin_unlock(&acct->workers_lock); if (io_run_task_work()) continue; ret = schedule_timeout(WORKER_IDLE_TIMEOUT); @@ -728,18 +742,18 @@ void io_wq_worker_sleeping(struct task_struct *tsk) io_wq_dec_running(worker); } -static void io_init_new_worker(struct io_wq *wq, struct io_worker *worker, +static void io_init_new_worker(struct io_wq *wq, struct io_wq_acct *acct, struct io_worker *worker, struct task_struct *tsk) { tsk->worker_private = worker; worker->task = tsk; set_cpus_allowed_ptr(tsk, wq->cpu_mask); - raw_spin_lock(&wq->lock); - hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list); - list_add_tail_rcu(&worker->all_list, &wq->all_list); + raw_spin_lock(&acct->workers_lock); + hlist_nulls_add_head_rcu(&worker->nulls_node, &acct->free_list); + list_add_tail_rcu(&worker->all_list, &acct->all_list); set_bit(IO_WORKER_F_FREE, &worker->flags); - raw_spin_unlock(&wq->lock); + raw_spin_unlock(&acct->workers_lock); wake_up_new_task(tsk); } @@ -770,25 +784,37 @@ static inline bool io_should_retry_thread(struct io_worker *worker, long err) } } +static void queue_create_worker_retry(struct io_worker *worker) +{ + /* + * We only bother retrying because there's a chance that the + * failure to create a worker is due to some temporary condition + * in the forking task (e.g. outstanding signal); give the task + * some time to clear that condition. + */ + schedule_delayed_work(&worker->work, + msecs_to_jiffies(worker->init_retries * 5)); +} + static void create_worker_cont(struct callback_head *cb) { struct io_worker *worker; struct task_struct *tsk; struct io_wq *wq; + struct io_wq_acct *acct; worker = container_of(cb, struct io_worker, create_work); clear_bit_unlock(0, &worker->create_state); wq = worker->wq; + acct = io_wq_get_acct(worker); tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); if (!IS_ERR(tsk)) { - io_init_new_worker(wq, worker, tsk); + io_init_new_worker(wq, acct, worker, tsk); io_worker_release(worker); return; } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) { - struct io_wq_acct *acct = io_wq_get_acct(worker); - atomic_dec(&acct->nr_running); - raw_spin_lock(&wq->lock); + raw_spin_lock(&acct->workers_lock); acct->nr_workers--; if (!acct->nr_workers) { struct io_cb_cancel_data match = { @@ -796,11 +822,11 @@ static void create_worker_cont(struct callback_head *cb) .cancel_all = true, }; - raw_spin_unlock(&wq->lock); + raw_spin_unlock(&acct->workers_lock); while (io_acct_cancel_pending_work(wq, acct, &match)) ; } else { - raw_spin_unlock(&wq->lock); + raw_spin_unlock(&acct->workers_lock); } io_worker_ref_put(wq); kfree(worker); @@ -809,21 +835,21 @@ static void create_worker_cont(struct callback_head *cb) /* re-create attempts grab a new worker ref, drop the existing one */ io_worker_release(worker); - schedule_work(&worker->work); + queue_create_worker_retry(worker); } static void io_workqueue_create(struct work_struct *work) { - struct io_worker *worker = container_of(work, struct io_worker, work); + struct io_worker *worker = container_of(work, struct io_worker, + work.work); struct io_wq_acct *acct = io_wq_get_acct(worker); if (!io_queue_worker_create(worker, acct, create_worker_cont)) kfree(worker); } -static bool create_io_worker(struct io_wq *wq, int index) +static bool create_io_worker(struct io_wq *wq, struct io_wq_acct *acct) { - struct io_wq_acct *acct = &wq->acct[index]; struct io_worker *worker; struct task_struct *tsk; @@ -833,30 +859,28 @@ static bool create_io_worker(struct io_wq *wq, int index) if (!worker) { fail: atomic_dec(&acct->nr_running); - raw_spin_lock(&wq->lock); + raw_spin_lock(&acct->workers_lock); acct->nr_workers--; - raw_spin_unlock(&wq->lock); + raw_spin_unlock(&acct->workers_lock); io_worker_ref_put(wq); return false; } refcount_set(&worker->ref, 1); worker->wq = wq; + worker->acct = acct; raw_spin_lock_init(&worker->lock); init_completion(&worker->ref_done); - if (index == IO_WQ_ACCT_BOUND) - set_bit(IO_WORKER_F_BOUND, &worker->flags); - tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); if (!IS_ERR(tsk)) { - io_init_new_worker(wq, worker, tsk); + io_init_new_worker(wq, acct, worker, tsk); } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) { kfree(worker); goto fail; } else { - INIT_WORK(&worker->work, io_workqueue_create); - schedule_work(&worker->work); + INIT_DELAYED_WORK(&worker->work, io_workqueue_create); + queue_create_worker_retry(worker); } return true; @@ -866,14 +890,14 @@ fail: * Iterate the passed in list and call the specific function for each * worker that isn't exiting */ -static bool io_wq_for_each_worker(struct io_wq *wq, - bool (*func)(struct io_worker *, void *), - void *data) +static bool io_acct_for_each_worker(struct io_wq_acct *acct, + bool (*func)(struct io_worker *, void *), + void *data) { struct io_worker *worker; bool ret = false; - list_for_each_entry_rcu(worker, &wq->all_list, all_list) { + list_for_each_entry_rcu(worker, &acct->all_list, all_list) { if (io_worker_get(worker)) { /* no task if node is/was offline */ if (worker->task) @@ -887,6 +911,18 @@ static bool io_wq_for_each_worker(struct io_wq *wq, return ret; } +static bool io_wq_for_each_worker(struct io_wq *wq, + bool (*func)(struct io_worker *, void *), + void *data) +{ + for (int i = 0; i < IO_WQ_ACCT_NR; i++) { + if (!io_acct_for_each_worker(&wq->acct[i], func, data)) + return false; + } + + return true; +} + static bool io_wq_worker_wake(struct io_worker *worker, void *data) { __set_notify_signal(worker->task); @@ -903,19 +939,19 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq) } while (work); } -static void io_wq_insert_work(struct io_wq *wq, struct io_wq_work *work) +static void io_wq_insert_work(struct io_wq *wq, struct io_wq_acct *acct, + struct io_wq_work *work, unsigned int work_flags) { - struct io_wq_acct *acct = io_work_get_acct(wq, work); unsigned int hash; struct io_wq_work *tail; - if (!io_wq_is_hashed(work)) { + if (!__io_wq_is_hashed(work_flags)) { append: wq_list_add_tail(&work->list, &acct->work_list); return; } - hash = io_get_work_hash(work); + hash = __io_get_work_hash(work_flags); tail = wq->hash_tail[hash]; wq->hash_tail[hash] = work; if (!tail) @@ -931,8 +967,8 @@ static bool io_wq_work_match_item(struct io_wq_work *work, void *data) void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) { - struct io_wq_acct *acct = io_work_get_acct(wq, work); unsigned int work_flags = atomic_read(&work->flags); + struct io_wq_acct *acct = io_work_get_acct(wq, work_flags); struct io_cb_cancel_data match = { .fn = io_wq_work_match_item, .data = work, @@ -951,12 +987,12 @@ void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) } raw_spin_lock(&acct->lock); - io_wq_insert_work(wq, work); + io_wq_insert_work(wq, acct, work, work_flags); clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); raw_spin_unlock(&acct->lock); rcu_read_lock(); - do_create = !io_wq_activate_free_worker(wq, acct); + do_create = !io_acct_activate_free_worker(acct); rcu_read_unlock(); if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) || @@ -967,12 +1003,12 @@ void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) if (likely(did_create)) return; - raw_spin_lock(&wq->lock); + raw_spin_lock(&acct->workers_lock); if (acct->nr_workers) { - raw_spin_unlock(&wq->lock); + raw_spin_unlock(&acct->workers_lock); return; } - raw_spin_unlock(&wq->lock); + raw_spin_unlock(&acct->workers_lock); /* fatal condition, failed to create the first worker */ io_acct_cancel_pending_work(wq, acct, &match); @@ -1021,10 +1057,10 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data) } static inline void io_wq_remove_pending(struct io_wq *wq, + struct io_wq_acct *acct, struct io_wq_work *work, struct io_wq_work_node *prev) { - struct io_wq_acct *acct = io_work_get_acct(wq, work); unsigned int hash = io_get_work_hash(work); struct io_wq_work *prev_work = NULL; @@ -1051,7 +1087,7 @@ static bool io_acct_cancel_pending_work(struct io_wq *wq, work = container_of(node, struct io_wq_work, list); if (!match->fn(work, match->data)) continue; - io_wq_remove_pending(wq, work, prev); + io_wq_remove_pending(wq, acct, work, prev); raw_spin_unlock(&acct->lock); io_run_cancel(work, wq); match->nr_pending++; @@ -1079,11 +1115,22 @@ retry: } } +static void io_acct_cancel_running_work(struct io_wq_acct *acct, + struct io_cb_cancel_data *match) +{ + raw_spin_lock(&acct->workers_lock); + io_acct_for_each_worker(acct, io_wq_worker_cancel, match); + raw_spin_unlock(&acct->workers_lock); +} + static void io_wq_cancel_running_work(struct io_wq *wq, struct io_cb_cancel_data *match) { rcu_read_lock(); - io_wq_for_each_worker(wq, io_wq_worker_cancel, match); + + for (int i = 0; i < IO_WQ_ACCT_NR; i++) + io_acct_cancel_running_work(&wq->acct[i], match); + rcu_read_unlock(); } @@ -1106,16 +1153,14 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, * as an indication that we attempt to signal cancellation. The * completion will run normally in this case. * - * Do both of these while holding the wq->lock, to ensure that + * Do both of these while holding the acct->workers_lock, to ensure that * we'll find a work item regardless of state. */ io_wq_cancel_pending_work(wq, &match); if (match.nr_pending && !match.cancel_all) return IO_WQ_CANCEL_OK; - raw_spin_lock(&wq->lock); io_wq_cancel_running_work(wq, &match); - raw_spin_unlock(&wq->lock); if (match.nr_running && !match.cancel_all) return IO_WQ_CANCEL_RUNNING; @@ -1139,7 +1184,7 @@ static int io_wq_hash_wake(struct wait_queue_entry *wait, unsigned mode, struct io_wq_acct *acct = &wq->acct[i]; if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags)) - io_wq_activate_free_worker(wq, acct); + io_acct_activate_free_worker(acct); } rcu_read_unlock(); return 1; @@ -1177,22 +1222,24 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) for (i = 0; i < IO_WQ_ACCT_NR; i++) { struct io_wq_acct *acct = &wq->acct[i]; - acct->index = i; atomic_set(&acct->nr_running, 0); + + raw_spin_lock_init(&acct->workers_lock); + INIT_HLIST_NULLS_HEAD(&acct->free_list, 0); + INIT_LIST_HEAD(&acct->all_list); + INIT_WQ_LIST(&acct->work_list); raw_spin_lock_init(&acct->lock); } - raw_spin_lock_init(&wq->lock); - INIT_HLIST_NULLS_HEAD(&wq->free_list, 0); - INIT_LIST_HEAD(&wq->all_list); - wq->task = get_task_struct(data->task); atomic_set(&wq->worker_refs, 1); init_completion(&wq->worker_done); ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node); - if (ret) + if (ret) { + put_task_struct(wq->task); goto err; + } return wq; err: @@ -1372,14 +1419,14 @@ int io_wq_max_workers(struct io_wq *wq, int *new_count) rcu_read_lock(); - raw_spin_lock(&wq->lock); for (i = 0; i < IO_WQ_ACCT_NR; i++) { acct = &wq->acct[i]; + raw_spin_lock(&acct->workers_lock); prev[i] = max_t(int, acct->max_workers, prev[i]); if (new_count[i]) acct->max_workers = new_count[i]; + raw_spin_unlock(&acct->workers_lock); } - raw_spin_unlock(&wq->lock); rcu_read_unlock(); for (i = 0; i < IO_WQ_ACCT_NR; i++) |