summaryrefslogtreecommitdiff
path: root/io_uring/io_uring.c
diff options
context:
space:
mode:
Diffstat (limited to 'io_uring/io_uring.c')
-rw-r--r--io_uring/io_uring.c111
1 files changed, 71 insertions, 40 deletions
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index b521186efa5c..db623b3185c8 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -538,7 +538,7 @@ static void io_eventfd_signal(struct io_ring_ctx *ctx)
} else {
atomic_inc(&ev_fd->refs);
if (!atomic_fetch_or(BIT(IO_EVENTFD_OP_SIGNAL_BIT), &ev_fd->ops))
- call_rcu(&ev_fd->rcu, io_eventfd_ops);
+ call_rcu_hurry(&ev_fd->rcu, io_eventfd_ops);
else
atomic_dec(&ev_fd->refs);
}
@@ -572,12 +572,11 @@ static void io_eventfd_flush_signal(struct io_ring_ctx *ctx)
void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
{
- if (ctx->off_timeout_used || ctx->drain_active) {
+ if (ctx->off_timeout_used)
+ io_flush_timeouts(ctx);
+ if (ctx->drain_active) {
spin_lock(&ctx->completion_lock);
- if (ctx->off_timeout_used)
- io_flush_timeouts(ctx);
- if (ctx->drain_active)
- io_queue_deferred(ctx);
+ io_queue_deferred(ctx);
spin_unlock(&ctx->completion_lock);
}
if (ctx->has_evfd)
@@ -597,6 +596,18 @@ static inline void __io_cq_unlock(struct io_ring_ctx *ctx)
spin_unlock(&ctx->completion_lock);
}
+static inline void io_cq_lock(struct io_ring_ctx *ctx)
+ __acquires(ctx->completion_lock)
+{
+ spin_lock(&ctx->completion_lock);
+}
+
+static inline void io_cq_unlock(struct io_ring_ctx *ctx)
+ __releases(ctx->completion_lock)
+{
+ spin_unlock(&ctx->completion_lock);
+}
+
/* keep it inlined for io_submit_flush_completions() */
static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx)
__releases(ctx->completion_lock)
@@ -666,16 +677,20 @@ static void __io_cqring_overflow_flush(struct io_ring_ctx *ctx)
io_cq_unlock_post(ctx);
}
+static void io_cqring_do_overflow_flush(struct io_ring_ctx *ctx)
+{
+ /* iopoll syncs against uring_lock, not completion_lock */
+ if (ctx->flags & IORING_SETUP_IOPOLL)
+ mutex_lock(&ctx->uring_lock);
+ __io_cqring_overflow_flush(ctx);
+ if (ctx->flags & IORING_SETUP_IOPOLL)
+ mutex_unlock(&ctx->uring_lock);
+}
+
static void io_cqring_overflow_flush(struct io_ring_ctx *ctx)
{
- if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) {
- /* iopoll syncs against uring_lock, not completion_lock */
- if (ctx->flags & IORING_SETUP_IOPOLL)
- mutex_lock(&ctx->uring_lock);
- __io_cqring_overflow_flush(ctx);
- if (ctx->flags & IORING_SETUP_IOPOLL)
- mutex_unlock(&ctx->uring_lock);
- }
+ if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq))
+ io_cqring_do_overflow_flush(ctx);
}
void __io_put_task(struct task_struct *task, int nr)
@@ -716,6 +731,8 @@ static bool io_cqring_event_overflow(struct io_ring_ctx *ctx, u64 user_data,
size_t ocq_size = sizeof(struct io_overflow_cqe);
bool is_cqe32 = (ctx->flags & IORING_SETUP_CQE32);
+ lockdep_assert_held(&ctx->completion_lock);
+
if (is_cqe32)
ocq_size += sizeof(struct io_uring_cqe);
@@ -805,9 +822,6 @@ static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res,
{
struct io_uring_cqe *cqe;
- if (!ctx->task_complete)
- lockdep_assert_held(&ctx->completion_lock);
-
ctx->cq_extra++;
/*
@@ -916,7 +930,7 @@ static void __io_req_complete_post(struct io_kiocb *req)
io_cq_lock(ctx);
if (!(req->flags & REQ_F_CQE_SKIP))
- __io_fill_cqe_req(ctx, req);
+ io_fill_cqe_req(ctx, req);
/*
* If we're the last reference to this request, add to our locked
@@ -1074,9 +1088,9 @@ static void __io_req_find_next_prep(struct io_kiocb *req)
{
struct io_ring_ctx *ctx = req->ctx;
- io_cq_lock(ctx);
+ spin_lock(&ctx->completion_lock);
io_disarm_next(req);
- io_cq_unlock_post(ctx);
+ spin_unlock(&ctx->completion_lock);
}
static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req)
@@ -1221,13 +1235,18 @@ static void io_req_local_work_add(struct io_kiocb *req)
{
struct io_ring_ctx *ctx = req->ctx;
- if (!llist_add(&req->io_task_work.node, &ctx->work_llist))
+ percpu_ref_get(&ctx->refs);
+
+ if (!llist_add(&req->io_task_work.node, &ctx->work_llist)) {
+ percpu_ref_put(&ctx->refs);
return;
+ }
/* need it for the following io_cqring_wake() */
smp_mb__after_atomic();
if (unlikely(atomic_read(&req->task->io_uring->in_idle))) {
io_move_task_work_from_local(ctx);
+ percpu_ref_put(&ctx->refs);
return;
}
@@ -1237,6 +1256,7 @@ static void io_req_local_work_add(struct io_kiocb *req)
if (ctx->has_evfd)
io_eventfd_signal(ctx);
__io_cqring_wake(ctx);
+ percpu_ref_put(&ctx->refs);
}
void __io_req_task_work_add(struct io_kiocb *req, bool allow_local)
@@ -1745,17 +1765,12 @@ queue:
}
spin_unlock(&ctx->completion_lock);
- ret = io_req_prep_async(req);
- if (ret) {
-fail:
- io_req_defer_failed(req, ret);
- return;
- }
io_prep_async_link(req);
de = kmalloc(sizeof(*de), GFP_KERNEL);
if (!de) {
ret = -ENOMEM;
- goto fail;
+ io_req_defer_failed(req, ret);
+ return;
}
spin_lock(&ctx->completion_lock);
@@ -2028,13 +2043,16 @@ static void io_queue_sqe_fallback(struct io_kiocb *req)
req->flags &= ~REQ_F_HARDLINK;
req->flags |= REQ_F_LINK;
io_req_defer_failed(req, req->cqe.res);
- } else if (unlikely(req->ctx->drain_active)) {
- io_drain_req(req);
} else {
int ret = io_req_prep_async(req);
- if (unlikely(ret))
+ if (unlikely(ret)) {
io_req_defer_failed(req, ret);
+ return;
+ }
+
+ if (unlikely(req->ctx->drain_active))
+ io_drain_req(req);
else
io_queue_iowq(req, NULL);
}
@@ -2450,7 +2468,7 @@ int io_run_task_work_sig(struct io_ring_ctx *ctx)
/* when returns >0, the caller should retry */
static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
struct io_wait_queue *iowq,
- ktime_t timeout)
+ ktime_t *timeout)
{
int ret;
unsigned long check_cq;
@@ -2468,9 +2486,16 @@ static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT))
return -EBADR;
}
- if (!schedule_hrtimeout(&timeout, HRTIMER_MODE_ABS))
+ if (!schedule_hrtimeout(timeout, HRTIMER_MODE_ABS))
return -ETIME;
- return 1;
+
+ /*
+ * Run task_work after scheduling. If we got woken because of
+ * task_work being processed, run it now rather than let the caller
+ * do another wait loop.
+ */
+ ret = io_run_task_work_sig(ctx);
+ return ret < 0 ? ret : 1;
}
/*
@@ -2531,10 +2556,15 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
trace_io_uring_cqring_wait(ctx, min_events);
do {
- io_cqring_overflow_flush(ctx);
+ if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) {
+ finish_wait(&ctx->cq_wait, &iowq.wq);
+ io_cqring_do_overflow_flush(ctx);
+ }
prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
TASK_INTERRUPTIBLE);
- ret = io_cqring_wait_schedule(ctx, &iowq, timeout);
+ ret = io_cqring_wait_schedule(ctx, &iowq, &timeout);
+ if (__io_cqring_events_user(ctx) >= min_events)
+ break;
cond_resched();
} while (ret > 0);
@@ -3642,7 +3672,7 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
if (ctx->flags & IORING_SETUP_SINGLE_ISSUER
&& !(ctx->flags & IORING_SETUP_R_DISABLED))
- ctx->submitter_task = get_task_struct(current);
+ WRITE_ONCE(ctx->submitter_task, get_task_struct(current));
file = io_uring_get_file(ctx);
if (IS_ERR(file)) {
@@ -3836,7 +3866,7 @@ static int io_register_enable_rings(struct io_ring_ctx *ctx)
return -EBADFD;
if (ctx->flags & IORING_SETUP_SINGLE_ISSUER && !ctx->submitter_task)
- ctx->submitter_task = get_task_struct(current);
+ WRITE_ONCE(ctx->submitter_task, get_task_struct(current));
if (ctx->restrictions.registered)
ctx->restricted = 1;
@@ -3993,8 +4023,6 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
return -EEXIST;
if (ctx->restricted) {
- if (opcode >= IORING_REGISTER_LAST)
- return -EINVAL;
opcode = array_index_nospec(opcode, IORING_REGISTER_LAST);
if (!test_bit(opcode, ctx->restrictions.register_op))
return -EACCES;
@@ -4150,6 +4178,9 @@ SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
long ret = -EBADF;
struct fd f;
+ if (opcode >= IORING_REGISTER_LAST)
+ return -EINVAL;
+
f = fdget(fd);
if (!f.file)
return -EBADF;