diff options
-rw-r--r-- | include/uapi/linux/io_uring.h | 2 | ||||
-rw-r--r-- | io_uring/io_uring.h | 10 | ||||
-rw-r--r-- | io_uring/net.c | 72 | ||||
-rw-r--r-- | io_uring/opdef.c | 16 | ||||
-rw-r--r-- | io_uring/zcrx.c | 190 | ||||
-rw-r--r-- | io_uring/zcrx.h | 13 |
6 files changed, 302 insertions, 1 deletions
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index 44844707d327..05d6255b0f6a 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -87,6 +87,7 @@ struct io_uring_sqe { union { __s32 splice_fd_in; __u32 file_index; + __u32 zcrx_ifq_idx; __u32 optlen; struct { __u16 addr_len; @@ -278,6 +279,7 @@ enum io_uring_op { IORING_OP_FTRUNCATE, IORING_OP_BIND, IORING_OP_LISTEN, + IORING_OP_RECV_ZC, /* this goes last, obviously */ IORING_OP_LAST, diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h index 6c46d9cdd7aa..650f81dac5d0 100644 --- a/io_uring/io_uring.h +++ b/io_uring/io_uring.h @@ -185,6 +185,16 @@ static inline bool io_get_cqe(struct io_ring_ctx *ctx, struct io_uring_cqe **ret return io_get_cqe_overflow(ctx, ret, false); } +static inline bool io_defer_get_uncommited_cqe(struct io_ring_ctx *ctx, + struct io_uring_cqe **cqe_ret) +{ + io_lockdep_assert_cq_locked(ctx); + + ctx->cq_extra++; + ctx->submit_state.cq_flush = true; + return io_get_cqe(ctx, cqe_ret); +} + static __always_inline bool io_fill_cqe_req(struct io_ring_ctx *ctx, struct io_kiocb *req) { diff --git a/io_uring/net.c b/io_uring/net.c index 10344b3a6d89..260eb73a5854 100644 --- a/io_uring/net.c +++ b/io_uring/net.c @@ -16,6 +16,7 @@ #include "net.h" #include "notif.h" #include "rsrc.h" +#include "zcrx.h" #if defined(CONFIG_NET) struct io_shutdown { @@ -89,6 +90,13 @@ struct io_sr_msg { */ #define MULTISHOT_MAX_RETRY 32 +struct io_recvzc { + struct file *file; + unsigned msg_flags; + u16 flags; + struct io_zcrx_ifq *ifq; +}; + int io_shutdown_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) { struct io_shutdown *shutdown = io_kiocb_to_cmd(req, struct io_shutdown); @@ -1227,6 +1235,70 @@ out_free: return ret; } +int io_recvzc_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) +{ + struct io_recvzc *zc = io_kiocb_to_cmd(req, struct io_recvzc); + unsigned ifq_idx; + + if (unlikely(sqe->file_index || sqe->addr2 || sqe->addr || + sqe->len || sqe->addr3)) + return -EINVAL; + + ifq_idx = READ_ONCE(sqe->zcrx_ifq_idx); + if (ifq_idx != 0) + return -EINVAL; + zc->ifq = req->ctx->ifq; + if (!zc->ifq) + return -EINVAL; + + zc->flags = READ_ONCE(sqe->ioprio); + zc->msg_flags = READ_ONCE(sqe->msg_flags); + if (zc->msg_flags) + return -EINVAL; + if (zc->flags & ~(IORING_RECVSEND_POLL_FIRST | IORING_RECV_MULTISHOT)) + return -EINVAL; + /* multishot required */ + if (!(zc->flags & IORING_RECV_MULTISHOT)) + return -EINVAL; + /* All data completions are posted as aux CQEs. */ + req->flags |= REQ_F_APOLL_MULTISHOT; + + return 0; +} + +int io_recvzc(struct io_kiocb *req, unsigned int issue_flags) +{ + struct io_recvzc *zc = io_kiocb_to_cmd(req, struct io_recvzc); + struct socket *sock; + int ret; + + if (!(req->flags & REQ_F_POLLED) && + (zc->flags & IORING_RECVSEND_POLL_FIRST)) + return -EAGAIN; + + sock = sock_from_file(req->file); + if (unlikely(!sock)) + return -ENOTSOCK; + + ret = io_zcrx_recv(req, zc->ifq, sock, zc->msg_flags | MSG_DONTWAIT, + issue_flags); + if (unlikely(ret <= 0) && ret != -EAGAIN) { + if (ret == -ERESTARTSYS) + ret = -EINTR; + + req_set_fail(req); + io_req_set_res(req, ret, 0); + + if (issue_flags & IO_URING_F_MULTISHOT) + return IOU_STOP_MULTISHOT; + return IOU_OK; + } + + if (issue_flags & IO_URING_F_MULTISHOT) + return IOU_ISSUE_SKIP_COMPLETE; + return -EAGAIN; +} + void io_send_zc_cleanup(struct io_kiocb *req) { struct io_sr_msg *zc = io_kiocb_to_cmd(req, struct io_sr_msg); diff --git a/io_uring/opdef.c b/io_uring/opdef.c index e8baef4e5146..89f50ecadeaf 100644 --- a/io_uring/opdef.c +++ b/io_uring/opdef.c @@ -37,6 +37,7 @@ #include "waitid.h" #include "futex.h" #include "truncate.h" +#include "zcrx.h" static int io_no_issue(struct io_kiocb *req, unsigned int issue_flags) { @@ -516,6 +517,18 @@ const struct io_issue_def io_issue_defs[] = { .prep = io_eopnotsupp_prep, #endif }, + [IORING_OP_RECV_ZC] = { + .needs_file = 1, + .unbound_nonreg_file = 1, + .pollin = 1, + .ioprio = 1, +#if defined(CONFIG_NET) + .prep = io_recvzc_prep, + .issue = io_recvzc, +#else + .prep = io_eopnotsupp_prep, +#endif + }, }; const struct io_cold_def io_cold_defs[] = { @@ -745,6 +758,9 @@ const struct io_cold_def io_cold_defs[] = { [IORING_OP_LISTEN] = { .name = "LISTEN", }, + [IORING_OP_RECV_ZC] = { + .name = "RECV_ZC", + }, }; const char *io_uring_get_opcode(u8 opcode) diff --git a/io_uring/zcrx.c b/io_uring/zcrx.c index 9d14fdf7a568..8833879d94ba 100644 --- a/io_uring/zcrx.c +++ b/io_uring/zcrx.c @@ -13,6 +13,8 @@ #include <net/netlink.h> #include <trace/events/page_pool.h> +#include <net/tcp.h> +#include <net/rps.h> #include <uapi/linux/io_uring.h> @@ -91,7 +93,12 @@ static void io_zcrx_sync_for_device(const struct page_pool *pool, #define IO_RQ_MAX_ENTRIES 32768 -__maybe_unused +struct io_zcrx_args { + struct io_kiocb *req; + struct io_zcrx_ifq *ifq; + struct socket *sock; +}; + static const struct memory_provider_ops io_uring_pp_zc_ops; static inline struct io_zcrx_area *io_zcrx_iov_to_area(const struct net_iov *niov) @@ -118,6 +125,11 @@ static bool io_zcrx_put_niov_uref(struct net_iov *niov) return true; } +static void io_zcrx_get_niov_uref(struct net_iov *niov) +{ + atomic_inc(io_get_user_counter(niov)); +} + static int io_allocate_rbuf_ring(struct io_zcrx_ifq *ifq, struct io_uring_zcrx_ifq_reg *reg, struct io_uring_region_desc *rd) @@ -614,3 +626,179 @@ static const struct memory_provider_ops io_uring_pp_zc_ops = { .nl_fill = io_pp_nl_fill, .uninstall = io_pp_uninstall, }; + +static bool io_zcrx_queue_cqe(struct io_kiocb *req, struct net_iov *niov, + struct io_zcrx_ifq *ifq, int off, int len) +{ + struct io_uring_zcrx_cqe *rcqe; + struct io_zcrx_area *area; + struct io_uring_cqe *cqe; + u64 offset; + + if (!io_defer_get_uncommited_cqe(req->ctx, &cqe)) + return false; + + cqe->user_data = req->cqe.user_data; + cqe->res = len; + cqe->flags = IORING_CQE_F_MORE; + + area = io_zcrx_iov_to_area(niov); + offset = off + (net_iov_idx(niov) << PAGE_SHIFT); + rcqe = (struct io_uring_zcrx_cqe *)(cqe + 1); + rcqe->off = offset + ((u64)area->area_id << IORING_ZCRX_AREA_SHIFT); + rcqe->__pad = 0; + return true; +} + +static int io_zcrx_recv_frag(struct io_kiocb *req, struct io_zcrx_ifq *ifq, + const skb_frag_t *frag, int off, int len) +{ + struct net_iov *niov; + + if (unlikely(!skb_frag_is_net_iov(frag))) + return -EOPNOTSUPP; + + niov = netmem_to_net_iov(frag->netmem); + if (niov->pp->mp_ops != &io_uring_pp_zc_ops || + niov->pp->mp_priv != ifq) + return -EFAULT; + + if (!io_zcrx_queue_cqe(req, niov, ifq, off + skb_frag_off(frag), len)) + return -ENOSPC; + + /* + * Prevent it from being recycled while user is accessing it. + * It has to be done before grabbing a user reference. + */ + page_pool_ref_netmem(net_iov_to_netmem(niov)); + io_zcrx_get_niov_uref(niov); + return len; +} + +static int +io_zcrx_recv_skb(read_descriptor_t *desc, struct sk_buff *skb, + unsigned int offset, size_t len) +{ + struct io_zcrx_args *args = desc->arg.data; + struct io_zcrx_ifq *ifq = args->ifq; + struct io_kiocb *req = args->req; + struct sk_buff *frag_iter; + unsigned start, start_off; + int i, copy, end, off; + int ret = 0; + + start = skb_headlen(skb); + start_off = offset; + + if (offset < start) + return -EOPNOTSUPP; + + for (i = 0; i < skb_shinfo(skb)->nr_frags; i++) { + const skb_frag_t *frag; + + if (WARN_ON(start > offset + len)) + return -EFAULT; + + frag = &skb_shinfo(skb)->frags[i]; + end = start + skb_frag_size(frag); + + if (offset < end) { + copy = end - offset; + if (copy > len) + copy = len; + + off = offset - start; + ret = io_zcrx_recv_frag(req, ifq, frag, off, copy); + if (ret < 0) + goto out; + + offset += ret; + len -= ret; + if (len == 0 || ret != copy) + goto out; + } + start = end; + } + + skb_walk_frags(skb, frag_iter) { + if (WARN_ON(start > offset + len)) + return -EFAULT; + + end = start + frag_iter->len; + if (offset < end) { + copy = end - offset; + if (copy > len) + copy = len; + + off = offset - start; + ret = io_zcrx_recv_skb(desc, frag_iter, off, copy); + if (ret < 0) + goto out; + + offset += ret; + len -= ret; + if (len == 0 || ret != copy) + goto out; + } + start = end; + } + +out: + if (offset == start_off) + return ret; + return offset - start_off; +} + +static int io_zcrx_tcp_recvmsg(struct io_kiocb *req, struct io_zcrx_ifq *ifq, + struct sock *sk, int flags, + unsigned issue_flags) +{ + struct io_zcrx_args args = { + .req = req, + .ifq = ifq, + .sock = sk->sk_socket, + }; + read_descriptor_t rd_desc = { + .count = 1, + .arg.data = &args, + }; + int ret; + + lock_sock(sk); + ret = tcp_read_sock(sk, &rd_desc, io_zcrx_recv_skb); + if (ret <= 0) { + if (ret < 0 || sock_flag(sk, SOCK_DONE)) + goto out; + if (sk->sk_err) + ret = sock_error(sk); + else if (sk->sk_shutdown & RCV_SHUTDOWN) + goto out; + else if (sk->sk_state == TCP_CLOSE) + ret = -ENOTCONN; + else + ret = -EAGAIN; + } else if (sock_flag(sk, SOCK_DONE)) { + /* Make it to retry until it finally gets 0. */ + if (issue_flags & IO_URING_F_MULTISHOT) + ret = IOU_REQUEUE; + else + ret = -EAGAIN; + } +out: + release_sock(sk); + return ret; +} + +int io_zcrx_recv(struct io_kiocb *req, struct io_zcrx_ifq *ifq, + struct socket *sock, unsigned int flags, + unsigned issue_flags) +{ + struct sock *sk = sock->sk; + const struct proto *prot = READ_ONCE(sk->sk_prot); + + if (prot->recvmsg != tcp_recvmsg) + return -EPROTONOSUPPORT; + + sock_rps_record_flow(sk); + return io_zcrx_tcp_recvmsg(req, ifq, sk, flags, issue_flags); +} diff --git a/io_uring/zcrx.h b/io_uring/zcrx.h index 1b6363591f72..a16bdd921f03 100644 --- a/io_uring/zcrx.h +++ b/io_uring/zcrx.h @@ -3,6 +3,7 @@ #define IOU_ZC_RX_H #include <linux/io_uring_types.h> +#include <linux/socket.h> #include <net/page_pool/types.h> #include <net/net_trackers.h> @@ -43,6 +44,9 @@ int io_register_zcrx_ifq(struct io_ring_ctx *ctx, struct io_uring_zcrx_ifq_reg __user *arg); void io_unregister_zcrx_ifqs(struct io_ring_ctx *ctx); void io_shutdown_zcrx_ifqs(struct io_ring_ctx *ctx); +int io_zcrx_recv(struct io_kiocb *req, struct io_zcrx_ifq *ifq, + struct socket *sock, unsigned int flags, + unsigned issue_flags); #else static inline int io_register_zcrx_ifq(struct io_ring_ctx *ctx, struct io_uring_zcrx_ifq_reg __user *arg) @@ -55,6 +59,15 @@ static inline void io_unregister_zcrx_ifqs(struct io_ring_ctx *ctx) static inline void io_shutdown_zcrx_ifqs(struct io_ring_ctx *ctx) { } +static inline int io_zcrx_recv(struct io_kiocb *req, struct io_zcrx_ifq *ifq, + struct socket *sock, unsigned int flags, + unsigned issue_flags) +{ + return -EOPNOTSUPP; +} #endif +int io_recvzc(struct io_kiocb *req, unsigned int issue_flags); +int io_recvzc_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe); + #endif |