From d29cb3726f03cdac7889f0109a7cb84f79e168a8 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Wed, 7 Aug 2024 15:18:13 +0100 Subject: io_uring: add absolute mode wait timeouts In addition to current relative timeouts for the waiting loop, where the timespec argument specifies the maximum time it can wait for, add support for the absolute mode, with the value carrying a CLOCK_MONOTONIC absolute time until which we should return control back to the user. Suggested-by: Lewis Baker Signed-off-by: Pavel Begunkov Link: https://lore.kernel.org/r/4d5b74d67ada882590b2e42aa3aa7117bbf6b55f.1723039801.git.asml.silence@gmail.com Signed-off-by: Jens Axboe --- include/uapi/linux/io_uring.h | 1 + 1 file changed, 1 insertion(+) (limited to 'include/uapi/linux') diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index adc2524fd8e3..6a81f55fcd0d 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -507,6 +507,7 @@ struct io_cqring_offsets { #define IORING_ENTER_SQ_WAIT (1U << 2) #define IORING_ENTER_EXT_ARG (1U << 3) #define IORING_ENTER_REGISTERED_RING (1U << 4) +#define IORING_ENTER_ABS_TIMER (1U << 5) /* * Passed in for io_uring_setup(2). Copied back with updated info on success -- cgit v1.2.3 From 2b8e976b984278edbeab3251d370e76d237699f9 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Wed, 7 Aug 2024 15:18:14 +0100 Subject: io_uring: user registered clockid for wait timeouts Add a new registration opcode IORING_REGISTER_CLOCK, which allows the user to select which clock id it wants to use with CQ waiting timeouts. It only allows a subset of all posix clocks and currently supports CLOCK_MONOTONIC and CLOCK_BOOTTIME. Suggested-by: Lewis Baker Signed-off-by: Pavel Begunkov Link: https://lore.kernel.org/r/98f2bc8a3c36cdf8f0e6a275245e81e903459703.1723039801.git.asml.silence@gmail.com Signed-off-by: Jens Axboe --- include/linux/io_uring_types.h | 3 +++ include/uapi/linux/io_uring.h | 7 +++++++ io_uring/io_uring.c | 8 ++++++-- io_uring/io_uring.h | 8 ++++++++ io_uring/napi.c | 2 +- io_uring/register.c | 31 +++++++++++++++++++++++++++++++ 6 files changed, 56 insertions(+), 3 deletions(-) (limited to 'include/uapi/linux') diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h index 3315005df117..4b9ba523978d 100644 --- a/include/linux/io_uring_types.h +++ b/include/linux/io_uring_types.h @@ -239,6 +239,9 @@ struct io_ring_ctx { struct io_rings *rings; struct percpu_ref refs; + clockid_t clockid; + enum tk_offsets clock_offset; + enum task_work_notify_mode notify_method; unsigned sq_thread_idle; } ____cacheline_aligned_in_smp; diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index 6a81f55fcd0d..7af716136df9 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -596,6 +596,8 @@ enum io_uring_register_op { IORING_REGISTER_NAPI = 27, IORING_UNREGISTER_NAPI = 28, + IORING_REGISTER_CLOCK = 29, + /* this goes last */ IORING_REGISTER_LAST, @@ -676,6 +678,11 @@ struct io_uring_restriction { __u32 resv2[3]; }; +struct io_uring_clock_register { + __u32 clockid; + __u32 __resv[3]; +}; + struct io_uring_buf { __u64 addr; __u32 len; diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 5282f9887440..20229e72b65c 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -2377,7 +2377,8 @@ static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx, ret = 0; if (iowq->timeout == KTIME_MAX) schedule(); - else if (!schedule_hrtimeout(&iowq->timeout, HRTIMER_MODE_ABS)) + else if (!schedule_hrtimeout_range_clock(&iowq->timeout, 0, + HRTIMER_MODE_ABS, ctx->clockid)) ret = -ETIME; current->in_iowait = 0; return ret; @@ -2422,7 +2423,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags, iowq.timeout = timespec64_to_ktime(ts); if (!(flags & IORING_ENTER_ABS_TIMER)) - iowq.timeout = ktime_add(iowq.timeout, ktime_get()); + iowq.timeout = ktime_add(iowq.timeout, io_get_time(ctx)); } if (sig) { @@ -3424,6 +3425,9 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p, if (!ctx) return -ENOMEM; + ctx->clockid = CLOCK_MONOTONIC; + ctx->clock_offset = 0; + if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) && !(ctx->flags & IORING_SETUP_IOPOLL) && !(ctx->flags & IORING_SETUP_SQPOLL)) diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h index c2acf6180845..9935819f12b7 100644 --- a/io_uring/io_uring.h +++ b/io_uring/io_uring.h @@ -437,6 +437,14 @@ static inline bool io_file_can_poll(struct io_kiocb *req) return false; } +static inline ktime_t io_get_time(struct io_ring_ctx *ctx) +{ + if (ctx->clockid == CLOCK_MONOTONIC) + return ktime_get(); + + return ktime_get_with_offset(ctx->clock_offset); +} + enum { IO_CHECK_CQ_OVERFLOW_BIT, IO_CHECK_CQ_DROPPED_BIT, diff --git a/io_uring/napi.c b/io_uring/napi.c index d78fcbecdd27..d0cf694d0172 100644 --- a/io_uring/napi.c +++ b/io_uring/napi.c @@ -283,7 +283,7 @@ void __io_napi_busy_loop(struct io_ring_ctx *ctx, struct io_wait_queue *iowq) iowq->napi_busy_poll_dt = READ_ONCE(ctx->napi_busy_poll_dt); if (iowq->timeout != KTIME_MAX) { - ktime_t dt = ktime_sub(iowq->timeout, ktime_get()); + ktime_t dt = ktime_sub(iowq->timeout, io_get_time(ctx)); iowq->napi_busy_poll_dt = min_t(u64, iowq->napi_busy_poll_dt, dt); } diff --git a/io_uring/register.c b/io_uring/register.c index e3c20be5a198..57cb85c42526 100644 --- a/io_uring/register.c +++ b/io_uring/register.c @@ -335,6 +335,31 @@ err: return ret; } +static int io_register_clock(struct io_ring_ctx *ctx, + struct io_uring_clock_register __user *arg) +{ + struct io_uring_clock_register reg; + + if (copy_from_user(®, arg, sizeof(reg))) + return -EFAULT; + if (memchr_inv(®.__resv, 0, sizeof(reg.__resv))) + return -EINVAL; + + switch (reg.clockid) { + case CLOCK_MONOTONIC: + ctx->clock_offset = 0; + break; + case CLOCK_BOOTTIME: + ctx->clock_offset = TK_OFFS_BOOT; + break; + default: + return -EINVAL; + } + + ctx->clockid = reg.clockid; + return 0; +} + static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, void __user *arg, unsigned nr_args) __releases(ctx->uring_lock) @@ -511,6 +536,12 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, break; ret = io_unregister_napi(ctx, arg); break; + case IORING_REGISTER_CLOCK: + ret = -EINVAL; + if (!arg || nr_args) + break; + ret = io_register_clock(ctx, arg); + break; default: ret = -EINVAL; break; -- cgit v1.2.3 From 7ed9e09e2d13d5d43385153bba4734cb0eafd7fd Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Thu, 4 Jan 2024 10:46:30 -0700 Subject: io_uring: wire up min batch wake timeout Expose min_wait_usec in io_uring_getevents_arg, replacing the pad member that is currently in there. The value is in usecs, which is explained in the name as well. Note that if min_wait_usec and a normal timeout is used in conjunction, the normal timeout is still relative to the base time. For example, if min_wait_usec is set to 100 and the normal timeout is 1000, the max total time waited is still 1000. This also means that if the normal timeout is shorter than min_wait_usec, then only the min_wait_usec will take effect. See previous commit for an explanation of how this works. IORING_FEAT_MIN_TIMEOUT is added as a feature flag for this, as applications doing submit_and_wait_timeout() style operations will generally not see the -EINVAL from the wait side as they return the number of IOs submitted. Only if no IOs are submitted will the -EINVAL bubble back up to the application. Signed-off-by: Jens Axboe --- include/uapi/linux/io_uring.h | 3 ++- io_uring/io_uring.c | 8 ++++---- 2 files changed, 6 insertions(+), 5 deletions(-) (limited to 'include/uapi/linux') diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index 7af716136df9..042eab793e26 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -543,6 +543,7 @@ struct io_uring_params { #define IORING_FEAT_LINKED_FILE (1U << 12) #define IORING_FEAT_REG_REG_RING (1U << 13) #define IORING_FEAT_RECVSEND_BUNDLE (1U << 14) +#define IORING_FEAT_MIN_TIMEOUT (1U << 15) /* * io_uring_register(2) opcodes and arguments @@ -766,7 +767,7 @@ enum io_uring_register_restriction_op { struct io_uring_getevents_arg { __u64 sigmask; __u32 sigmask_sz; - __u32 pad; + __u32 min_wait_usec; __u64 ts; }; diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 968cd5fb3f79..80bb6e2374e9 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -2475,6 +2475,7 @@ struct ext_arg { size_t argsz; struct __kernel_timespec __user *ts; const sigset_t __user *sig; + ktime_t min_time; }; /* @@ -2508,7 +2509,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags, iowq.cq_min_tail = READ_ONCE(ctx->rings->cq.tail); iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts); iowq.hit_timeout = 0; - iowq.min_timeout = 0; + iowq.min_timeout = ext_arg->min_time; iowq.timeout = KTIME_MAX; start_time = io_get_time(ctx); @@ -3239,8 +3240,7 @@ static int io_get_ext_arg(unsigned flags, const void __user *argp, return -EINVAL; if (copy_from_user(&arg, argp, sizeof(arg))) return -EFAULT; - if (arg.pad) - return -EINVAL; + ext_arg->min_time = arg.min_wait_usec * NSEC_PER_USEC; ext_arg->sig = u64_to_user_ptr(arg.sigmask); ext_arg->argsz = arg.sigmask_sz; ext_arg->ts = u64_to_user_ptr(arg.ts); @@ -3641,7 +3641,7 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p, IORING_FEAT_EXT_ARG | IORING_FEAT_NATIVE_WORKERS | IORING_FEAT_RSRC_TAGS | IORING_FEAT_CQE_SKIP | IORING_FEAT_LINKED_FILE | IORING_FEAT_REG_REG_RING | - IORING_FEAT_RECVSEND_BUNDLE; + IORING_FEAT_RECVSEND_BUNDLE | IORING_FEAT_MIN_TIMEOUT; if (copy_to_user(params, p, sizeof(*p))) { ret = -EFAULT; -- cgit v1.2.3 From ae98dbf43d755b4e111fcd086e53939bef3e9a1a Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Fri, 9 Aug 2024 11:20:45 -0600 Subject: io_uring/kbuf: add support for incremental buffer consumption By default, any recv/read operation that uses provided buffers will consume at least 1 buffer fully (and maybe more, in case of bundles). This adds support for incremental consumption, meaning that an application may add large buffers, and each read/recv will just consume the part of the buffer that it needs. For example, let's say an application registers 1MB buffers in a provided buffer ring, for streaming receives. If it gets a short recv, then the full 1MB buffer will be consumed and passed back to the application. With incremental consumption, only the part that was actually used is consumed, and the buffer remains the current one. This means that both the application and the kernel needs to keep track of what the current receive point is. Each recv will still pass back a buffer ID and the size consumed, the only difference is that before the next receive would always be the next buffer in the ring. Now the same buffer ID may return multiple receives, each at an offset into that buffer from where the previous receive left off. Example: Application registers a provided buffer ring, and adds two 32K buffers to the ring. Buffer1 address: 0x1000000 (buffer ID 0) Buffer2 address: 0x2000000 (buffer ID 1) A recv completion is received with the following values: cqe->res 0x1000 (4k bytes received) cqe->flags 0x11 (CQE_F_BUFFER|CQE_F_BUF_MORE set, buffer ID 0) and the application now knows that 4096b of data is available at 0x1000000, the start of that buffer, and that more data from this buffer will be coming. Now the next receive comes in: cqe->res 0x2010 (8k bytes received) cqe->flags 0x11 (CQE_F_BUFFER|CQE_F_BUF_MORE set, buffer ID 0) which tells the application that 8k is available where the last completion left off, at 0x1001000. Next completion is: cqe->res 0x5000 (20k bytes received) cqe->flags 0x1 (CQE_F_BUFFER set, buffer ID 0) and the application now knows that 20k of data is available at 0x1003000, which is where the previous receive ended. CQE_F_BUF_MORE isn't set, as no more data is available in this buffer ID. The next completion is then: cqe->res 0x1000 (4k bytes received) cqe->flags 0x10001 (CQE_F_BUFFER|CQE_F_BUF_MORE set, buffer ID 1) which tells the application that buffer ID 1 is now the current one, hence there's 4k of valid data at 0x2000000. 0x2001000 will be the next receive point for this buffer ID. When a buffer will be reused by future CQE completions, IORING_CQE_BUF_MORE will be set in cqe->flags. This tells the application that the kernel isn't done with the buffer yet, and that it should expect more completions for this buffer ID. Will only be set by provided buffer rings setup with IOU_PBUF_RING INC, as that's the only type of buffer that will see multiple consecutive completions for the same buffer ID. For any other provided buffer type, any completion that passes back a buffer to the application is final. Once a buffer has been fully consumed, the buffer ring head is incremented and the next receive will indicate the next buffer ID in the CQE cflags. On the send side, the application can manage how much data is sent from an existing buffer by setting sqe->len to the desired send length. An application can request incremental consumption by setting IOU_PBUF_RING_INC in the provided buffer ring registration. Outside of that, any provided buffer ring setup and buffer additions is done like before, no changes there. The only change is in how an application may see multiple completions for the same buffer ID, hence needing to know where the next receive will happen. Note that like existing provided buffer rings, this should not be used with IOSQE_ASYNC, as both really require the ring to remain locked over the duration of the buffer selection and the operation completion. It will consume a buffer otherwise regardless of the size of the IO done. Signed-off-by: Jens Axboe --- include/uapi/linux/io_uring.h | 18 ++++++++++++++++++ io_uring/kbuf.c | 42 ++++++++++++++++++++++++++++++------------ io_uring/kbuf.h | 42 ++++++++++++++++++++++++++++++++++-------- 3 files changed, 82 insertions(+), 20 deletions(-) (limited to 'include/uapi/linux') diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index 042eab793e26..a275f91d2ac0 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -440,11 +440,21 @@ struct io_uring_cqe { * IORING_CQE_F_SOCK_NONEMPTY If set, more data to read after socket recv * IORING_CQE_F_NOTIF Set for notification CQEs. Can be used to distinct * them from sends. + * IORING_CQE_F_BUF_MORE If set, the buffer ID set in the completion will get + * more completions. In other words, the buffer is being + * partially consumed, and will be used by the kernel for + * more completions. This is only set for buffers used via + * the incremental buffer consumption, as provided by + * a ring buffer setup with IOU_PBUF_RING_INC. For any + * other provided buffer type, all completions with a + * buffer passed back is automatically returned to the + * application. */ #define IORING_CQE_F_BUFFER (1U << 0) #define IORING_CQE_F_MORE (1U << 1) #define IORING_CQE_F_SOCK_NONEMPTY (1U << 2) #define IORING_CQE_F_NOTIF (1U << 3) +#define IORING_CQE_F_BUF_MORE (1U << 4) #define IORING_CQE_BUFFER_SHIFT 16 @@ -716,9 +726,17 @@ struct io_uring_buf_ring { * mmap(2) with the offset set as: * IORING_OFF_PBUF_RING | (bgid << IORING_OFF_PBUF_SHIFT) * to get a virtual mapping for the ring. + * IOU_PBUF_RING_INC: If set, buffers consumed from this buffer ring can be + * consumed incrementally. Normally one (or more) buffers + * are fully consumed. With incremental consumptions, it's + * feasible to register big ranges of buffers, and each + * use of it will consume only as much as it needs. This + * requires that both the kernel and application keep + * track of where the current read/recv index is at. */ enum io_uring_register_pbuf_ring_flags { IOU_PBUF_RING_MMAP = 1, + IOU_PBUF_RING_INC = 2, }; /* argument for IORING_(UN)REGISTER_PBUF_RING */ diff --git a/io_uring/kbuf.c b/io_uring/kbuf.c index 55d01861d8c5..1f503bcc9c9f 100644 --- a/io_uring/kbuf.c +++ b/io_uring/kbuf.c @@ -212,14 +212,25 @@ static int io_ring_buffers_peek(struct io_kiocb *req, struct buf_sel_arg *arg, buf = io_ring_head_to_buf(br, head, bl->mask); if (arg->max_len) { u32 len = READ_ONCE(buf->len); - size_t needed; if (unlikely(!len)) return -ENOBUFS; - needed = (arg->max_len + len - 1) / len; - needed = min_not_zero(needed, (size_t) PEEK_MAX_IMPORT); - if (nr_avail > needed) - nr_avail = needed; + /* + * Limit incremental buffers to 1 segment. No point trying + * to peek ahead and map more than we need, when the buffers + * themselves should be large when setup with + * IOU_PBUF_RING_INC. + */ + if (bl->flags & IOBL_INC) { + nr_avail = 1; + } else { + size_t needed; + + needed = (arg->max_len + len - 1) / len; + needed = min_not_zero(needed, (size_t) PEEK_MAX_IMPORT); + if (nr_avail > needed) + nr_avail = needed; + } } /* @@ -244,16 +255,21 @@ static int io_ring_buffers_peek(struct io_kiocb *req, struct buf_sel_arg *arg, req->buf_index = buf->bid; do { - /* truncate end piece, if needed */ - if (buf->len > arg->max_len) - buf->len = arg->max_len; + u32 len = buf->len; + + /* truncate end piece, if needed, for non partial buffers */ + if (len > arg->max_len) { + len = arg->max_len; + if (!(bl->flags & IOBL_INC)) + buf->len = len; + } iov->iov_base = u64_to_user_ptr(buf->addr); - iov->iov_len = buf->len; + iov->iov_len = len; iov++; - arg->out_len += buf->len; - arg->max_len -= buf->len; + arg->out_len += len; + arg->max_len -= len; if (!arg->max_len) break; @@ -675,7 +691,7 @@ int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg) if (reg.resv[0] || reg.resv[1] || reg.resv[2]) return -EINVAL; - if (reg.flags & ~IOU_PBUF_RING_MMAP) + if (reg.flags & ~(IOU_PBUF_RING_MMAP | IOU_PBUF_RING_INC)) return -EINVAL; if (!(reg.flags & IOU_PBUF_RING_MMAP)) { if (!reg.ring_addr) @@ -713,6 +729,8 @@ int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg) if (!ret) { bl->nr_entries = reg.ring_entries; bl->mask = reg.ring_entries - 1; + if (reg.flags & IOU_PBUF_RING_INC) + bl->flags |= IOBL_INC; io_buffer_add_list(ctx, bl, reg.bgid); return 0; diff --git a/io_uring/kbuf.h b/io_uring/kbuf.h index b41e2a0a0505..36aadfe5ac00 100644 --- a/io_uring/kbuf.h +++ b/io_uring/kbuf.h @@ -9,6 +9,9 @@ enum { IOBL_BUF_RING = 1, /* ring mapped provided buffers, but mmap'ed by application */ IOBL_MMAP = 2, + /* buffers are consumed incrementally rather than always fully */ + IOBL_INC = 4, + }; struct io_buffer_list { @@ -124,24 +127,45 @@ static inline bool io_kbuf_recycle(struct io_kiocb *req, unsigned issue_flags) /* Mapped buffer ring, return io_uring_buf from head */ #define io_ring_head_to_buf(br, head, mask) &(br)->bufs[(head) & (mask)] -static inline void io_kbuf_commit(struct io_kiocb *req, +static inline bool io_kbuf_commit(struct io_kiocb *req, struct io_buffer_list *bl, int len, int nr) { if (unlikely(!(req->flags & REQ_F_BUFFERS_COMMIT))) - return; - bl->head += nr; + return true; + req->flags &= ~REQ_F_BUFFERS_COMMIT; + + if (unlikely(len < 0)) + return true; + + if (bl->flags & IOBL_INC) { + struct io_uring_buf *buf; + + buf = io_ring_head_to_buf(bl->buf_ring, bl->head, bl->mask); + if (WARN_ON_ONCE(len > buf->len)) + len = buf->len; + buf->len -= len; + if (buf->len) { + buf->addr += len; + return false; + } + } + + bl->head += nr; + return true; } -static inline void __io_put_kbuf_ring(struct io_kiocb *req, int len, int nr) +static inline bool __io_put_kbuf_ring(struct io_kiocb *req, int len, int nr) { struct io_buffer_list *bl = req->buf_list; + bool ret = true; if (bl) { - io_kbuf_commit(req, bl, len, nr); + ret = io_kbuf_commit(req, bl, len, nr); req->buf_index = bl->bgid; } req->flags &= ~REQ_F_BUFFER_RING; + return ret; } static inline void __io_put_kbuf_list(struct io_kiocb *req, int len, @@ -176,10 +200,12 @@ static inline unsigned int __io_put_kbufs(struct io_kiocb *req, int len, return 0; ret = IORING_CQE_F_BUFFER | (req->buf_index << IORING_CQE_BUFFER_SHIFT); - if (req->flags & REQ_F_BUFFER_RING) - __io_put_kbuf_ring(req, len, nbufs); - else + if (req->flags & REQ_F_BUFFER_RING) { + if (!__io_put_kbuf_ring(req, len, nbufs)) + ret |= IORING_CQE_F_BUF_MORE; + } else { __io_put_kbuf(req, len, issue_flags); + } return ret; } -- cgit v1.2.3 From 7cc2a6eadcd7a5aa36ac63e6659f5c6138c7f4d2 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Wed, 11 Sep 2024 13:56:08 -0600 Subject: io_uring: add IORING_REGISTER_COPY_BUFFERS method Buffers can get registered with io_uring, which allows to skip the repeated pin_pages, unpin/unref pages for each O_DIRECT operation. This reduces the overhead of O_DIRECT IO. However, registrering buffers can take some time. Normally this isn't an issue as it's done at initialization time (and hence less critical), but for cases where rings can be created and destroyed as part of an IO thread pool, registering the same buffers for multiple rings become a more time sensitive proposition. As an example, let's say an application has an IO memory pool of 500G. Initial registration takes: Got 500 huge pages (each 1024MB) Registered 500 pages in 409 msec or about 0.4 seconds. If we go higher to 900 1GB huge pages being registered: Registered 900 pages in 738 msec which is, as expected, a fully linear scaling. Rather than have each ring pin/map/register the same buffer pool, provide an io_uring_register(2) opcode to simply duplicate the buffers that are registered with another ring. Adding the same 900GB of registered buffers to the target ring can then be accomplished in: Copied 900 pages in 17 usec While timing differs a bit, this provides around a 25,000-40,000x speedup for this use case. Signed-off-by: Jens Axboe --- include/uapi/linux/io_uring.h | 13 +++++++ io_uring/register.c | 6 +++ io_uring/rsrc.c | 91 +++++++++++++++++++++++++++++++++++++++++++ io_uring/rsrc.h | 1 + 4 files changed, 111 insertions(+) (limited to 'include/uapi/linux') diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index a275f91d2ac0..9dc5bb428c8a 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -609,6 +609,9 @@ enum io_uring_register_op { IORING_REGISTER_CLOCK = 29, + /* copy registered buffers from source ring to current ring */ + IORING_REGISTER_COPY_BUFFERS = 30, + /* this goes last */ IORING_REGISTER_LAST, @@ -694,6 +697,16 @@ struct io_uring_clock_register { __u32 __resv[3]; }; +enum { + IORING_REGISTER_SRC_REGISTERED = 1, +}; + +struct io_uring_copy_buffers { + __u32 src_fd; + __u32 flags; + __u32 pad[6]; +}; + struct io_uring_buf { __u64 addr; __u32 len; diff --git a/io_uring/register.c b/io_uring/register.c index d90159478045..dab0f8024ddf 100644 --- a/io_uring/register.c +++ b/io_uring/register.c @@ -542,6 +542,12 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, break; ret = io_register_clock(ctx, arg); break; + case IORING_REGISTER_COPY_BUFFERS: + ret = -EINVAL; + if (!arg || nr_args != 1) + break; + ret = io_register_copy_buffers(ctx, arg); + break; default: ret = -EINVAL; break; diff --git a/io_uring/rsrc.c b/io_uring/rsrc.c index 28f98de3c304..40696a395f0a 100644 --- a/io_uring/rsrc.c +++ b/io_uring/rsrc.c @@ -17,6 +17,7 @@ #include "openclose.h" #include "rsrc.h" #include "memmap.h" +#include "register.h" struct io_rsrc_update { struct file *file; @@ -1137,3 +1138,93 @@ int io_import_fixed(int ddir, struct iov_iter *iter, return 0; } + +static int io_copy_buffers(struct io_ring_ctx *ctx, struct io_ring_ctx *src_ctx) +{ + struct io_mapped_ubuf **user_bufs; + struct io_rsrc_data *data; + int i, ret, nbufs; + + /* + * Drop our own lock here. We'll setup the data we need and reference + * the source buffers, then re-grab, check, and assign at the end. + */ + mutex_unlock(&ctx->uring_lock); + + mutex_lock(&src_ctx->uring_lock); + ret = -ENXIO; + nbufs = src_ctx->nr_user_bufs; + if (!nbufs) + goto out_unlock; + ret = io_rsrc_data_alloc(ctx, IORING_RSRC_BUFFER, NULL, nbufs, &data); + if (ret) + goto out_unlock; + + ret = -ENOMEM; + user_bufs = kcalloc(nbufs, sizeof(*ctx->user_bufs), GFP_KERNEL); + if (!user_bufs) + goto out_free_data; + + for (i = 0; i < nbufs; i++) { + struct io_mapped_ubuf *src = src_ctx->user_bufs[i]; + + refcount_inc(&src->refs); + user_bufs[i] = src; + } + + /* Have a ref on the bufs now, drop src lock and re-grab our own lock */ + mutex_unlock(&src_ctx->uring_lock); + mutex_lock(&ctx->uring_lock); + if (!ctx->user_bufs) { + ctx->user_bufs = user_bufs; + ctx->buf_data = data; + ctx->nr_user_bufs = nbufs; + return 0; + } + + /* someone raced setting up buffers, dump ours */ + for (i = 0; i < nbufs; i++) + io_buffer_unmap(ctx, &user_bufs[i]); + io_rsrc_data_free(data); + kfree(user_bufs); + return -EBUSY; +out_free_data: + io_rsrc_data_free(data); +out_unlock: + mutex_unlock(&src_ctx->uring_lock); + mutex_lock(&ctx->uring_lock); + return ret; +} + +/* + * Copy the registered buffers from the source ring whose file descriptor + * is given in the src_fd to the current ring. This is identical to registering + * the buffers with ctx, except faster as mappings already exist. + * + * Since the memory is already accounted once, don't account it again. + */ +int io_register_copy_buffers(struct io_ring_ctx *ctx, void __user *arg) +{ + struct io_uring_copy_buffers buf; + bool registered_src; + struct file *file; + int ret; + + if (ctx->user_bufs || ctx->nr_user_bufs) + return -EBUSY; + if (copy_from_user(&buf, arg, sizeof(buf))) + return -EFAULT; + if (buf.flags & ~IORING_REGISTER_SRC_REGISTERED) + return -EINVAL; + if (memchr_inv(buf.pad, 0, sizeof(buf.pad))) + return -EINVAL; + + registered_src = (buf.flags & IORING_REGISTER_SRC_REGISTERED) != 0; + file = io_uring_register_get_file(buf.src_fd, registered_src); + if (IS_ERR(file)) + return PTR_ERR(file); + ret = io_copy_buffers(ctx, file->private_data); + if (!registered_src) + fput(file); + return ret; +} diff --git a/io_uring/rsrc.h b/io_uring/rsrc.h index 98a253172c27..93546ab337a6 100644 --- a/io_uring/rsrc.h +++ b/io_uring/rsrc.h @@ -68,6 +68,7 @@ int io_import_fixed(int ddir, struct iov_iter *iter, struct io_mapped_ubuf *imu, u64 buf_addr, size_t len); +int io_register_copy_buffers(struct io_ring_ctx *ctx, void __user *arg); void __io_sqe_buffers_unregister(struct io_ring_ctx *ctx); int io_sqe_buffers_unregister(struct io_ring_ctx *ctx); int io_sqe_buffers_register(struct io_ring_ctx *ctx, void __user *arg, -- cgit v1.2.3