summaryrefslogtreecommitdiff
path: root/io_uring/io-wq.c
diff options
context:
space:
mode:
Diffstat (limited to 'io_uring/io-wq.c')
-rw-r--r--io_uring/io-wq.c65
1 files changed, 44 insertions, 21 deletions
diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
index a38f36b68060..24f06fba4309 100644
--- a/io_uring/io-wq.c
+++ b/io_uring/io-wq.c
@@ -64,7 +64,7 @@ struct io_worker {
union {
struct rcu_head rcu;
- struct work_struct work;
+ struct delayed_work work;
};
};
@@ -160,9 +160,9 @@ static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound)
}
static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq,
- struct io_wq_work *work)
+ unsigned int work_flags)
{
- return io_get_acct(wq, !(atomic_read(&work->flags) & IO_WQ_WORK_UNBOUND));
+ return io_get_acct(wq, !(work_flags & IO_WQ_WORK_UNBOUND));
}
static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker)
@@ -452,9 +452,14 @@ static void __io_worker_idle(struct io_wq *wq, struct io_worker *worker)
}
}
+static inline unsigned int __io_get_work_hash(unsigned int work_flags)
+{
+ return work_flags >> IO_WQ_HASH_SHIFT;
+}
+
static inline unsigned int io_get_work_hash(struct io_wq_work *work)
{
- return atomic_read(&work->flags) >> IO_WQ_HASH_SHIFT;
+ return __io_get_work_hash(atomic_read(&work->flags));
}
static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash)
@@ -484,17 +489,19 @@ static struct io_wq_work *io_get_next_work(struct io_wq_acct *acct,
struct io_wq *wq = worker->wq;
wq_list_for_each(node, prev, &acct->work_list) {
+ unsigned int work_flags;
unsigned int hash;
work = container_of(node, struct io_wq_work, list);
/* not hashed, can run anytime */
- if (!io_wq_is_hashed(work)) {
+ work_flags = atomic_read(&work->flags);
+ if (!__io_wq_is_hashed(work_flags)) {
wq_list_del(&acct->work_list, node, prev);
return work;
}
- hash = io_get_work_hash(work);
+ hash = __io_get_work_hash(work_flags);
/* all items with this hash lie in [work, tail] */
tail = wq->hash_tail[hash];
@@ -591,12 +598,15 @@ static void io_worker_handle_work(struct io_wq_acct *acct,
/* handle a whole dependent link */
do {
struct io_wq_work *next_hashed, *linked;
- unsigned int hash = io_get_work_hash(work);
+ unsigned int work_flags = atomic_read(&work->flags);
+ unsigned int hash = __io_wq_is_hashed(work_flags)
+ ? __io_get_work_hash(work_flags)
+ : -1U;
next_hashed = wq_next_work(work);
if (do_kill &&
- (atomic_read(&work->flags) & IO_WQ_WORK_UNBOUND))
+ (work_flags & IO_WQ_WORK_UNBOUND))
atomic_or(IO_WQ_WORK_CANCEL, &work->flags);
wq->do_work(work);
io_assign_current_work(worker, NULL);
@@ -634,7 +644,7 @@ static int io_wq_worker(void *data)
struct io_wq_acct *acct = io_wq_get_acct(worker);
struct io_wq *wq = worker->wq;
bool exit_mask = false, last_timeout = false;
- char buf[TASK_COMM_LEN];
+ char buf[TASK_COMM_LEN] = {};
set_mask_bits(&worker->flags, 0,
BIT(IO_WORKER_F_UP) | BIT(IO_WORKER_F_RUNNING));
@@ -770,6 +780,18 @@ static inline bool io_should_retry_thread(struct io_worker *worker, long err)
}
}
+static void queue_create_worker_retry(struct io_worker *worker)
+{
+ /*
+ * We only bother retrying because there's a chance that the
+ * failure to create a worker is due to some temporary condition
+ * in the forking task (e.g. outstanding signal); give the task
+ * some time to clear that condition.
+ */
+ schedule_delayed_work(&worker->work,
+ msecs_to_jiffies(worker->init_retries * 5));
+}
+
static void create_worker_cont(struct callback_head *cb)
{
struct io_worker *worker;
@@ -809,12 +831,13 @@ static void create_worker_cont(struct callback_head *cb)
/* re-create attempts grab a new worker ref, drop the existing one */
io_worker_release(worker);
- schedule_work(&worker->work);
+ queue_create_worker_retry(worker);
}
static void io_workqueue_create(struct work_struct *work)
{
- struct io_worker *worker = container_of(work, struct io_worker, work);
+ struct io_worker *worker = container_of(work, struct io_worker,
+ work.work);
struct io_wq_acct *acct = io_wq_get_acct(worker);
if (!io_queue_worker_create(worker, acct, create_worker_cont))
@@ -855,8 +878,8 @@ fail:
kfree(worker);
goto fail;
} else {
- INIT_WORK(&worker->work, io_workqueue_create);
- schedule_work(&worker->work);
+ INIT_DELAYED_WORK(&worker->work, io_workqueue_create);
+ queue_create_worker_retry(worker);
}
return true;
@@ -903,19 +926,19 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq)
} while (work);
}
-static void io_wq_insert_work(struct io_wq *wq, struct io_wq_work *work)
+static void io_wq_insert_work(struct io_wq *wq, struct io_wq_acct *acct,
+ struct io_wq_work *work, unsigned int work_flags)
{
- struct io_wq_acct *acct = io_work_get_acct(wq, work);
unsigned int hash;
struct io_wq_work *tail;
- if (!io_wq_is_hashed(work)) {
+ if (!__io_wq_is_hashed(work_flags)) {
append:
wq_list_add_tail(&work->list, &acct->work_list);
return;
}
- hash = io_get_work_hash(work);
+ hash = __io_get_work_hash(work_flags);
tail = wq->hash_tail[hash];
wq->hash_tail[hash] = work;
if (!tail)
@@ -931,8 +954,8 @@ static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
{
- struct io_wq_acct *acct = io_work_get_acct(wq, work);
unsigned int work_flags = atomic_read(&work->flags);
+ struct io_wq_acct *acct = io_work_get_acct(wq, work_flags);
struct io_cb_cancel_data match = {
.fn = io_wq_work_match_item,
.data = work,
@@ -951,7 +974,7 @@ void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
}
raw_spin_lock(&acct->lock);
- io_wq_insert_work(wq, work);
+ io_wq_insert_work(wq, acct, work, work_flags);
clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
raw_spin_unlock(&acct->lock);
@@ -1021,10 +1044,10 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
}
static inline void io_wq_remove_pending(struct io_wq *wq,
+ struct io_wq_acct *acct,
struct io_wq_work *work,
struct io_wq_work_node *prev)
{
- struct io_wq_acct *acct = io_work_get_acct(wq, work);
unsigned int hash = io_get_work_hash(work);
struct io_wq_work *prev_work = NULL;
@@ -1051,7 +1074,7 @@ static bool io_acct_cancel_pending_work(struct io_wq *wq,
work = container_of(node, struct io_wq_work, list);
if (!match->fn(work, match->data))
continue;
- io_wq_remove_pending(wq, work, prev);
+ io_wq_remove_pending(wq, acct, work, prev);
raw_spin_unlock(&acct->lock);
io_run_cancel(work, wq);
match->nr_pending++;