diff options
Diffstat (limited to 'fs/io-wq.c')
-rw-r--r-- | fs/io-wq.c | 445 |
1 files changed, 274 insertions, 171 deletions
diff --git a/fs/io-wq.c b/fs/io-wq.c index cd9bd095fb1b..6c55362c1f99 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -23,8 +23,7 @@ enum { IO_WORKER_F_UP = 1, /* up and active */ IO_WORKER_F_RUNNING = 2, /* account as running */ IO_WORKER_F_FREE = 4, /* worker on free list */ - IO_WORKER_F_FIXED = 8, /* static idle worker */ - IO_WORKER_F_BOUND = 16, /* is doing bounded work */ + IO_WORKER_F_BOUND = 8, /* is doing bounded work */ }; enum { @@ -32,7 +31,7 @@ enum { }; enum { - IO_WQE_FLAG_STALLED = 1, /* stalled on hash */ + IO_ACCT_STALLED_BIT = 0, /* stalled on hash */ }; /* @@ -55,7 +54,10 @@ struct io_worker { struct callback_head create_work; int create_index; - struct rcu_head rcu; + union { + struct rcu_head rcu; + struct work_struct work; + }; }; #if BITS_PER_LONG == 64 @@ -71,25 +73,24 @@ struct io_wqe_acct { unsigned max_workers; int index; atomic_t nr_running; + struct io_wq_work_list work_list; + unsigned long flags; }; enum { IO_WQ_ACCT_BOUND, IO_WQ_ACCT_UNBOUND, + IO_WQ_ACCT_NR, }; /* * Per-node worker thread pool */ struct io_wqe { - struct { - raw_spinlock_t lock; - struct io_wq_work_list work_list; - unsigned flags; - } ____cacheline_aligned_in_smp; + raw_spinlock_t lock; + struct io_wqe_acct acct[2]; int node; - struct io_wqe_acct acct[2]; struct hlist_nulls_head free_list; struct list_head all_list; @@ -133,8 +134,11 @@ struct io_cb_cancel_data { bool cancel_all; }; -static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bool first); +static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index); static void io_wqe_dec_running(struct io_worker *worker); +static bool io_acct_cancel_pending_work(struct io_wqe *wqe, + struct io_wqe_acct *acct, + struct io_cb_cancel_data *match); static bool io_worker_get(struct io_worker *worker) { @@ -195,11 +199,10 @@ static void io_worker_exit(struct io_worker *worker) do_exit(0); } -static inline bool io_wqe_run_queue(struct io_wqe *wqe) - __must_hold(wqe->lock) +static inline bool io_acct_run_queue(struct io_wqe_acct *acct) { - if (!wq_list_empty(&wqe->work_list) && - !(wqe->flags & IO_WQE_FLAG_STALLED)) + if (!wq_list_empty(&acct->work_list) && + !test_bit(IO_ACCT_STALLED_BIT, &acct->flags)) return true; return false; } @@ -208,7 +211,8 @@ static inline bool io_wqe_run_queue(struct io_wqe *wqe) * Check head of free list for an available worker. If one isn't available, * caller must create one. */ -static bool io_wqe_activate_free_worker(struct io_wqe *wqe) +static bool io_wqe_activate_free_worker(struct io_wqe *wqe, + struct io_wqe_acct *acct) __must_hold(RCU) { struct hlist_nulls_node *n; @@ -222,6 +226,10 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe) hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) { if (!io_worker_get(worker)) continue; + if (io_wqe_get_acct(worker) != acct) { + io_worker_release(worker); + continue; + } if (wake_up_process(worker->task)) { io_worker_release(worker); return true; @@ -236,9 +244,9 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe) * We need a worker. If we find a free one, we're good. If not, and we're * below the max number of workers, create one. */ -static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) +static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) { - bool ret; + bool do_create = false; /* * Most likely an attempt to queue unbounded work on an io_wq that @@ -247,27 +255,19 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) if (unlikely(!acct->max_workers)) pr_warn_once("io-wq is not configured for unbound workers"); - rcu_read_lock(); - ret = io_wqe_activate_free_worker(wqe); - rcu_read_unlock(); - - if (!ret) { - bool do_create = false, first = false; - - raw_spin_lock(&wqe->lock); - if (acct->nr_workers < acct->max_workers) { - if (!acct->nr_workers) - first = true; - acct->nr_workers++; - do_create = true; - } - raw_spin_unlock(&wqe->lock); - if (do_create) { - atomic_inc(&acct->nr_running); - atomic_inc(&wqe->wq->worker_refs); - create_io_worker(wqe->wq, wqe, acct->index, first); - } + raw_spin_lock(&wqe->lock); + if (acct->nr_workers < acct->max_workers) { + acct->nr_workers++; + do_create = true; } + raw_spin_unlock(&wqe->lock); + if (do_create) { + atomic_inc(&acct->nr_running); + atomic_inc(&wqe->wq->worker_refs); + return create_io_worker(wqe->wq, wqe, acct->index); + } + + return true; } static void io_wqe_inc_running(struct io_worker *worker) @@ -283,7 +283,7 @@ static void create_worker_cb(struct callback_head *cb) struct io_wq *wq; struct io_wqe *wqe; struct io_wqe_acct *acct; - bool do_create = false, first = false; + bool do_create = false; worker = container_of(cb, struct io_worker, create_work); wqe = worker->wqe; @@ -291,14 +291,12 @@ static void create_worker_cb(struct callback_head *cb) acct = &wqe->acct[worker->create_index]; raw_spin_lock(&wqe->lock); if (acct->nr_workers < acct->max_workers) { - if (!acct->nr_workers) - first = true; acct->nr_workers++; do_create = true; } raw_spin_unlock(&wqe->lock); if (do_create) { - create_io_worker(wq, wqe, worker->create_index, first); + create_io_worker(wq, wqe, worker->create_index); } else { atomic_dec(&acct->nr_running); io_worker_ref_put(wq); @@ -307,9 +305,11 @@ static void create_worker_cb(struct callback_head *cb) io_worker_release(worker); } -static void io_queue_worker_create(struct io_wqe *wqe, struct io_worker *worker, - struct io_wqe_acct *acct) +static bool io_queue_worker_create(struct io_worker *worker, + struct io_wqe_acct *acct, + task_work_func_t func) { + struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; /* raced with exit, just ignore create call */ @@ -327,16 +327,17 @@ static void io_queue_worker_create(struct io_wqe *wqe, struct io_worker *worker, test_and_set_bit_lock(0, &worker->create_state)) goto fail_release; - init_task_work(&worker->create_work, create_worker_cb); + init_task_work(&worker->create_work, func); worker->create_index = acct->index; if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) - return; + return true; clear_bit_unlock(0, &worker->create_state); fail_release: io_worker_release(worker); fail: atomic_dec(&acct->nr_running); io_worker_ref_put(wq); + return false; } static void io_wqe_dec_running(struct io_worker *worker) @@ -348,10 +349,10 @@ static void io_wqe_dec_running(struct io_worker *worker) if (!(worker->flags & IO_WORKER_F_UP)) return; - if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) { + if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) { atomic_inc(&acct->nr_running); atomic_inc(&wqe->wq->worker_refs); - io_queue_worker_create(wqe, worker, acct); + io_queue_worker_create(worker, acct, create_worker_cb); } } @@ -363,29 +364,10 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker, struct io_wq_work *work) __must_hold(wqe->lock) { - bool worker_bound, work_bound; - - BUILD_BUG_ON((IO_WQ_ACCT_UNBOUND ^ IO_WQ_ACCT_BOUND) != 1); - if (worker->flags & IO_WORKER_F_FREE) { worker->flags &= ~IO_WORKER_F_FREE; hlist_nulls_del_init_rcu(&worker->nulls_node); } - - /* - * If worker is moving from bound to unbound (or vice versa), then - * ensure we update the running accounting. - */ - worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0; - work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0; - if (worker_bound != work_bound) { - int index = work_bound ? IO_WQ_ACCT_UNBOUND : IO_WQ_ACCT_BOUND; - io_wqe_dec_running(worker); - worker->flags ^= IO_WORKER_F_BOUND; - wqe->acct[index].nr_workers--; - wqe->acct[index ^ 1].nr_workers++; - io_wqe_inc_running(worker); - } } /* @@ -413,7 +395,7 @@ static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash) { struct io_wq *wq = wqe->wq; - spin_lock(&wq->hash->wait.lock); + spin_lock_irq(&wq->hash->wait.lock); if (list_empty(&wqe->wait.entry)) { __add_wait_queue(&wq->hash->wait, &wqe->wait); if (!test_bit(hash, &wq->hash->map)) { @@ -421,48 +403,26 @@ static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash) list_del_init(&wqe->wait.entry); } } - spin_unlock(&wq->hash->wait.lock); -} - -/* - * We can always run the work if the worker is currently the same type as - * the work (eg both are bound, or both are unbound). If they are not the - * same, only allow it if incrementing the worker count would be allowed. - */ -static bool io_worker_can_run_work(struct io_worker *worker, - struct io_wq_work *work) -{ - struct io_wqe_acct *acct; - - if (!(worker->flags & IO_WORKER_F_BOUND) != - !(work->flags & IO_WQ_WORK_UNBOUND)) - return true; - - /* not the same type, check if we'd go over the limit */ - acct = io_work_get_acct(worker->wqe, work); - return acct->nr_workers < acct->max_workers; + spin_unlock_irq(&wq->hash->wait.lock); } -static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, - struct io_worker *worker, - bool *stalled) +static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct, + struct io_worker *worker) __must_hold(wqe->lock) { struct io_wq_work_node *node, *prev; struct io_wq_work *work, *tail; unsigned int stall_hash = -1U; + struct io_wqe *wqe = worker->wqe; - wq_list_for_each(node, prev, &wqe->work_list) { + wq_list_for_each(node, prev, &acct->work_list) { unsigned int hash; work = container_of(node, struct io_wq_work, list); - if (!io_worker_can_run_work(worker, work)) - break; - /* not hashed, can run anytime */ if (!io_wq_is_hashed(work)) { - wq_list_del(&wqe->work_list, node, prev); + wq_list_del(&acct->work_list, node, prev); return work; } @@ -473,7 +433,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, /* hashed, can run if not already running */ if (!test_and_set_bit(hash, &wqe->wq->hash->map)) { wqe->hash_tail[hash] = NULL; - wq_list_cut(&wqe->work_list, &tail->list, prev); + wq_list_cut(&acct->work_list, &tail->list, prev); return work; } if (stall_hash == -1U) @@ -483,10 +443,14 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, } if (stall_hash != -1U) { + /* + * Set this before dropping the lock to avoid racing with new + * work being added and clearing the stalled bit. + */ + set_bit(IO_ACCT_STALLED_BIT, &acct->flags); raw_spin_unlock(&wqe->lock); io_wait_on_hash(wqe, stall_hash); raw_spin_lock(&wqe->lock); - *stalled = true; } return NULL; @@ -520,13 +484,13 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work); static void io_worker_handle_work(struct io_worker *worker) __releases(wqe->lock) { + struct io_wqe_acct *acct = io_wqe_get_acct(worker); struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state); do { struct io_wq_work *work; - bool stalled; get_next: /* * If we got some work, mark us as busy. If we didn't, but @@ -535,12 +499,9 @@ get_next: * can't make progress, any work completion or insertion will * clear the stalled flag. */ - stalled = false; - work = io_get_next_work(wqe, worker, &stalled); + work = io_get_next_work(acct, worker); if (work) __io_worker_busy(wqe, worker, work); - else if (stalled) - wqe->flags |= IO_WQE_FLAG_STALLED; raw_spin_unlock(&wqe->lock); if (!work) @@ -572,10 +533,10 @@ get_next: if (hash != -1U && !next_hashed) { clear_bit(hash, &wq->hash->map); + clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); if (wq_has_sleeper(&wq->hash->wait)) wake_up(&wq->hash->wait); raw_spin_lock(&wqe->lock); - wqe->flags &= ~IO_WQE_FLAG_STALLED; /* skip unnecessary unlock-lock wqe->lock */ if (!work) goto get_next; @@ -590,8 +551,10 @@ get_next: static int io_wqe_worker(void *data) { struct io_worker *worker = data; + struct io_wqe_acct *acct = io_wqe_get_acct(worker); struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; + bool last_timeout = false; char buf[TASK_COMM_LEN]; worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); @@ -605,10 +568,17 @@ static int io_wqe_worker(void *data) set_current_state(TASK_INTERRUPTIBLE); loop: raw_spin_lock(&wqe->lock); - if (io_wqe_run_queue(wqe)) { + if (io_acct_run_queue(acct)) { io_worker_handle_work(worker); goto loop; } + /* timed out, exit unless we're the last worker */ + if (last_timeout && acct->nr_workers > 1) { + raw_spin_unlock(&wqe->lock); + __set_current_state(TASK_RUNNING); + break; + } + last_timeout = false; __io_worker_idle(wqe, worker); raw_spin_unlock(&wqe->lock); if (io_flush_signals()) @@ -619,13 +589,11 @@ loop: if (!get_signal(&ksig)) continue; - break; - } - if (ret) + if (fatal_signal_pending(current)) + break; continue; - /* timed out, exit unless we're the fixed worker */ - if (!(worker->flags & IO_WORKER_F_FIXED)) - break; + } + last_timeout = !ret; } if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) { @@ -676,51 +644,131 @@ void io_wq_worker_sleeping(struct task_struct *tsk) raw_spin_unlock(&worker->wqe->lock); } -static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bool first) +static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker, + struct task_struct *tsk) +{ + tsk->pf_io_worker = worker; + worker->task = tsk; + set_cpus_allowed_ptr(tsk, wqe->cpu_mask); + tsk->flags |= PF_NO_SETAFFINITY; + + raw_spin_lock(&wqe->lock); + hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); + list_add_tail_rcu(&worker->all_list, &wqe->all_list); + worker->flags |= IO_WORKER_F_FREE; + raw_spin_unlock(&wqe->lock); + wake_up_new_task(tsk); +} + +static bool io_wq_work_match_all(struct io_wq_work *work, void *data) +{ + return true; +} + +static inline bool io_should_retry_thread(long err) +{ + switch (err) { + case -EAGAIN: + case -ERESTARTSYS: + case -ERESTARTNOINTR: + case -ERESTARTNOHAND: + return true; + default: + return false; + } +} + +static void create_worker_cont(struct callback_head *cb) { - struct io_wqe_acct *acct = &wqe->acct[index]; struct io_worker *worker; struct task_struct *tsk; + struct io_wqe *wqe; - __set_current_state(TASK_RUNNING); + worker = container_of(cb, struct io_worker, create_work); + clear_bit_unlock(0, &worker->create_state); + wqe = worker->wqe; + tsk = create_io_thread(io_wqe_worker, worker, wqe->node); + if (!IS_ERR(tsk)) { + io_init_new_worker(wqe, worker, tsk); + io_worker_release(worker); + return; + } else if (!io_should_retry_thread(PTR_ERR(tsk))) { + struct io_wqe_acct *acct = io_wqe_get_acct(worker); - worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node); - if (!worker) - goto fail; + atomic_dec(&acct->nr_running); + raw_spin_lock(&wqe->lock); + acct->nr_workers--; + if (!acct->nr_workers) { + struct io_cb_cancel_data match = { + .fn = io_wq_work_match_all, + .cancel_all = true, + }; - refcount_set(&worker->ref, 1); - worker->nulls_node.pprev = NULL; - worker->wqe = wqe; - spin_lock_init(&worker->lock); - init_completion(&worker->ref_done); + while (io_acct_cancel_pending_work(wqe, acct, &match)) + raw_spin_lock(&wqe->lock); + } + raw_spin_unlock(&wqe->lock); + io_worker_ref_put(wqe->wq); + kfree(worker); + return; + } - tsk = create_io_thread(io_wqe_worker, worker, wqe->node); - if (IS_ERR(tsk)) { + /* re-create attempts grab a new worker ref, drop the existing one */ + io_worker_release(worker); + schedule_work(&worker->work); +} + +static void io_workqueue_create(struct work_struct *work) +{ + struct io_worker *worker = container_of(work, struct io_worker, work); + struct io_wqe_acct *acct = io_wqe_get_acct(worker); + + if (!io_queue_worker_create(worker, acct, create_worker_cont)) { + clear_bit_unlock(0, &worker->create_state); + io_worker_release(worker); kfree(worker); + } +} + +static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) +{ + struct io_wqe_acct *acct = &wqe->acct[index]; + struct io_worker *worker; + struct task_struct *tsk; + + __set_current_state(TASK_RUNNING); + + worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node); + if (!worker) { fail: atomic_dec(&acct->nr_running); raw_spin_lock(&wqe->lock); acct->nr_workers--; raw_spin_unlock(&wqe->lock); io_worker_ref_put(wq); - return; + return false; } - tsk->pf_io_worker = worker; - worker->task = tsk; - set_cpus_allowed_ptr(tsk, wqe->cpu_mask); - tsk->flags |= PF_NO_SETAFFINITY; + refcount_set(&worker->ref, 1); + worker->wqe = wqe; + spin_lock_init(&worker->lock); + init_completion(&worker->ref_done); - raw_spin_lock(&wqe->lock); - hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); - list_add_tail_rcu(&worker->all_list, &wqe->all_list); - worker->flags |= IO_WORKER_F_FREE; if (index == IO_WQ_ACCT_BOUND) worker->flags |= IO_WORKER_F_BOUND; - if (first && (worker->flags & IO_WORKER_F_BOUND)) - worker->flags |= IO_WORKER_F_FIXED; - raw_spin_unlock(&wqe->lock); - wake_up_new_task(tsk); + + tsk = create_io_thread(io_wqe_worker, worker, wqe->node); + if (!IS_ERR(tsk)) { + io_init_new_worker(wqe, worker, tsk); + } else if (!io_should_retry_thread(PTR_ERR(tsk))) { + kfree(worker); + goto fail; + } else { + INIT_WORK(&worker->work, io_workqueue_create); + schedule_work(&worker->work); + } + + return true; } /* @@ -755,11 +803,6 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data) return false; } -static bool io_wq_work_match_all(struct io_wq_work *work, void *data) -{ - return true; -} - static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe) { struct io_wq *wq = wqe->wq; @@ -773,12 +816,13 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe) static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work) { + struct io_wqe_acct *acct = io_work_get_acct(wqe, work); unsigned int hash; struct io_wq_work *tail; if (!io_wq_is_hashed(work)) { append: - wq_list_add_tail(&work->list, &wqe->work_list); + wq_list_add_tail(&work->list, &acct->work_list); return; } @@ -788,13 +832,19 @@ append: if (!tail) goto append; - wq_list_add_after(&work->list, &tail->list, &wqe->work_list); + wq_list_add_after(&work->list, &tail->list, &acct->work_list); +} + +static bool io_wq_work_match_item(struct io_wq_work *work, void *data) +{ + return work == data; } static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) { struct io_wqe_acct *acct = io_work_get_acct(wqe, work); - bool do_wake; + unsigned work_flags = work->flags; + bool do_create; /* * If io-wq is exiting for this task, or if the request has explicitly @@ -808,13 +858,36 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) raw_spin_lock(&wqe->lock); io_wqe_insert_work(wqe, work); - wqe->flags &= ~IO_WQE_FLAG_STALLED; - do_wake = (work->flags & IO_WQ_WORK_CONCURRENT) || - !atomic_read(&acct->nr_running); + clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); + + rcu_read_lock(); + do_create = !io_wqe_activate_free_worker(wqe, acct); + rcu_read_unlock(); + raw_spin_unlock(&wqe->lock); - if (do_wake) - io_wqe_wake_worker(wqe, acct); + if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) || + !atomic_read(&acct->nr_running))) { + bool did_create; + + did_create = io_wqe_create_worker(wqe, acct); + if (likely(did_create)) + return; + + raw_spin_lock(&wqe->lock); + /* fatal condition, failed to create the first worker */ + if (!acct->nr_workers) { + struct io_cb_cancel_data match = { + .fn = io_wq_work_match_item, + .data = work, + .cancel_all = false, + }; + + if (io_acct_cancel_pending_work(wqe, acct, &match)) + raw_spin_lock(&wqe->lock); + } + raw_spin_unlock(&wqe->lock); + } } void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) @@ -859,6 +932,7 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe, struct io_wq_work *work, struct io_wq_work_node *prev) { + struct io_wqe_acct *acct = io_work_get_acct(wqe, work); unsigned int hash = io_get_work_hash(work); struct io_wq_work *prev_work = NULL; @@ -870,18 +944,18 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe, else wqe->hash_tail[hash] = NULL; } - wq_list_del(&wqe->work_list, &work->list, prev); + wq_list_del(&acct->work_list, &work->list, prev); } -static void io_wqe_cancel_pending_work(struct io_wqe *wqe, - struct io_cb_cancel_data *match) +static bool io_acct_cancel_pending_work(struct io_wqe *wqe, + struct io_wqe_acct *acct, + struct io_cb_cancel_data *match) + __releases(wqe->lock) { struct io_wq_work_node *node, *prev; struct io_wq_work *work; -retry: - raw_spin_lock(&wqe->lock); - wq_list_for_each(node, prev, &wqe->work_list) { + wq_list_for_each(node, prev, &acct->work_list) { work = container_of(node, struct io_wq_work, list); if (!match->fn(work, match->data)) continue; @@ -889,11 +963,27 @@ retry: raw_spin_unlock(&wqe->lock); io_run_cancel(work, wqe); match->nr_pending++; - if (!match->cancel_all) - return; - /* not safe to continue after unlock */ - goto retry; + return true; + } + + return false; +} + +static void io_wqe_cancel_pending_work(struct io_wqe *wqe, + struct io_cb_cancel_data *match) +{ + int i; +retry: + raw_spin_lock(&wqe->lock); + for (i = 0; i < IO_WQ_ACCT_NR; i++) { + struct io_wqe_acct *acct = io_get_acct(wqe, i == 0); + + if (io_acct_cancel_pending_work(wqe, acct, match)) { + if (match->cancel_all) + goto retry; + return; + } } raw_spin_unlock(&wqe->lock); } @@ -954,18 +1044,24 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode, int sync, void *key) { struct io_wqe *wqe = container_of(wait, struct io_wqe, wait); + int i; list_del_init(&wait->entry); rcu_read_lock(); - io_wqe_activate_free_worker(wqe); + for (i = 0; i < IO_WQ_ACCT_NR; i++) { + struct io_wqe_acct *acct = &wqe->acct[i]; + + if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags)) + io_wqe_activate_free_worker(wqe, acct); + } rcu_read_unlock(); return 1; } struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) { - int ret, node; + int ret, node, i; struct io_wq *wq; if (WARN_ON_ONCE(!data->free_work || !data->do_work)) @@ -1000,18 +1096,20 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) cpumask_copy(wqe->cpu_mask, cpumask_of_node(node)); wq->wqes[node] = wqe; wqe->node = alloc_node; - wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND; - wqe->acct[IO_WQ_ACCT_UNBOUND].index = IO_WQ_ACCT_UNBOUND; wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; - atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0); wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers = task_rlimit(current, RLIMIT_NPROC); - atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0); - wqe->wait.func = io_wqe_hash_wake; INIT_LIST_HEAD(&wqe->wait.entry); + wqe->wait.func = io_wqe_hash_wake; + for (i = 0; i < IO_WQ_ACCT_NR; i++) { + struct io_wqe_acct *acct = &wqe->acct[i]; + + acct->index = i; + atomic_set(&acct->nr_running, 0); + INIT_WQ_LIST(&acct->work_list); + } wqe->wq = wq; raw_spin_lock_init(&wqe->lock); - INIT_WQ_LIST(&wqe->work_list); INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0); INIT_LIST_HEAD(&wqe->all_list); } @@ -1038,7 +1136,7 @@ static bool io_task_work_match(struct callback_head *cb, void *data) { struct io_worker *worker; - if (cb->func != create_worker_cb) + if (cb->func != create_worker_cb && cb->func != create_worker_cont) return false; worker = container_of(cb, struct io_worker, create_work); return worker->wqe->wq == data; @@ -1059,9 +1157,14 @@ static void io_wq_exit_workers(struct io_wq *wq) while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) { struct io_worker *worker; + struct io_wqe_acct *acct; worker = container_of(cb, struct io_worker, create_work); - atomic_dec(&worker->wqe->acct[worker->create_index].nr_running); + acct = io_wqe_get_acct(worker); + atomic_dec(&acct->nr_running); + raw_spin_lock(&worker->wqe->lock); + acct->nr_workers--; + raw_spin_unlock(&worker->wqe->lock); io_worker_ref_put(wq); clear_bit_unlock(0, &worker->create_state); io_worker_release(worker); @@ -1193,7 +1296,7 @@ int io_wq_max_workers(struct io_wq *wq, int *new_count) for_each_node(node) { struct io_wqe_acct *acct; - for (i = 0; i < 2; i++) { + for (i = 0; i < IO_WQ_ACCT_NR; i++) { acct = &wq->wqes[node]->acct[i]; prev = max_t(int, acct->max_workers, prev); if (new_count[i]) |