summaryrefslogtreecommitdiff
path: root/ipc/msg.c
diff options
context:
space:
mode:
Diffstat (limited to 'ipc/msg.c')
-rw-r--r--ipc/msg.c133
1 files changed, 40 insertions, 93 deletions
diff --git a/ipc/msg.c b/ipc/msg.c
index c6521c205cb4..b1fb06a6a75b 100644
--- a/ipc/msg.c
+++ b/ipc/msg.c
@@ -51,13 +51,7 @@ struct msg_receiver {
long r_msgtype;
long r_maxsize;
- /*
- * Mark r_msg volatile so that the compiler
- * does not try to get smart and optimize
- * it. We rely on this for the lockless
- * receive algorithm.
- */
- struct msg_msg *volatile r_msg;
+ struct msg_msg *r_msg;
};
/* one msg_sender for each sleeping sender */
@@ -183,21 +177,14 @@ static void ss_wakeup(struct list_head *h, int kill)
}
}
-static void expunge_all(struct msg_queue *msq, int res)
+static void expunge_all(struct msg_queue *msq, int res,
+ struct wake_q_head *wake_q)
{
struct msg_receiver *msr, *t;
list_for_each_entry_safe(msr, t, &msq->q_receivers, r_list) {
- msr->r_msg = NULL; /* initialize expunge ordering */
- wake_up_process(msr->r_tsk);
- /*
- * Ensure that the wakeup is visible before setting r_msg as
- * the receiving end depends on it: either spinning on a nil,
- * or dealing with -EAGAIN cases. See lockless receive part 1
- * and 2 in do_msgrcv().
- */
- smp_wmb(); /* barrier (B) */
- msr->r_msg = ERR_PTR(res);
+ wake_q_add(wake_q, msr->r_tsk);
+ WRITE_ONCE(msr->r_msg, ERR_PTR(res));
}
}
@@ -213,11 +200,13 @@ static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
{
struct msg_msg *msg, *t;
struct msg_queue *msq = container_of(ipcp, struct msg_queue, q_perm);
+ WAKE_Q(wake_q);
- expunge_all(msq, -EIDRM);
+ expunge_all(msq, -EIDRM, &wake_q);
ss_wakeup(&msq->q_senders, 1);
msg_rmid(ns, msq);
ipc_unlock_object(&msq->q_perm);
+ wake_up_q(&wake_q);
rcu_read_unlock();
list_for_each_entry_safe(msg, t, &msq->q_messages, m_list) {
@@ -342,6 +331,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
struct kern_ipc_perm *ipcp;
struct msqid64_ds uninitialized_var(msqid64);
struct msg_queue *msq;
+ WAKE_Q(wake_q);
int err;
if (cmd == IPC_SET) {
@@ -389,7 +379,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
/* sleeping receivers might be excluded by
* stricter permissions.
*/
- expunge_all(msq, -EAGAIN);
+ expunge_all(msq, -EAGAIN, &wake_q);
/* sleeping senders might be able to send
* due to a larger queue size.
*/
@@ -402,6 +392,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
out_unlock0:
ipc_unlock_object(&msq->q_perm);
+ wake_up_q(&wake_q);
out_unlock1:
rcu_read_unlock();
out_up:
@@ -566,7 +557,8 @@ static int testmsg(struct msg_msg *msg, long type, int mode)
return 0;
}
-static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
+static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg,
+ struct wake_q_head *wake_q)
{
struct msg_receiver *msr, *t;
@@ -577,27 +569,14 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
list_del(&msr->r_list);
if (msr->r_maxsize < msg->m_ts) {
- /* initialize pipelined send ordering */
- msr->r_msg = NULL;
- wake_up_process(msr->r_tsk);
- /* barrier (B) see barrier comment below */
- smp_wmb();
- msr->r_msg = ERR_PTR(-E2BIG);
+ wake_q_add(wake_q, msr->r_tsk);
+ WRITE_ONCE(msr->r_msg, ERR_PTR(-E2BIG));
} else {
- msr->r_msg = NULL;
msq->q_lrpid = task_pid_vnr(msr->r_tsk);
msq->q_rtime = get_seconds();
- wake_up_process(msr->r_tsk);
- /*
- * Ensure that the wakeup is visible before
- * setting r_msg, as the receiving can otherwise
- * exit - once r_msg is set, the receiver can
- * continue. See lockless receive part 1 and 2
- * in do_msgrcv(). Barrier (B).
- */
- smp_wmb();
- msr->r_msg = msg;
+ wake_q_add(wake_q, msr->r_tsk);
+ WRITE_ONCE(msr->r_msg, msg);
return 1;
}
}
@@ -613,6 +592,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
struct msg_msg *msg;
int err;
struct ipc_namespace *ns;
+ WAKE_Q(wake_q);
ns = current->nsproxy->ipc_ns;
@@ -686,7 +666,6 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
err = -EIDRM;
goto out_unlock0;
}
-
ss_del(&s);
if (signal_pending(current)) {
@@ -698,7 +677,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
msq->q_lspid = task_tgid_vnr(current);
msq->q_stime = get_seconds();
- if (!pipelined_send(msq, msg)) {
+ if (!pipelined_send(msq, msg, &wake_q)) {
/* no one is waiting for this message, enqueue it */
list_add_tail(&msg->m_list, &msq->q_messages);
msq->q_cbytes += msgsz;
@@ -712,6 +691,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
out_unlock0:
ipc_unlock_object(&msq->q_perm);
+ wake_up_q(&wake_q);
out_unlock1:
rcu_read_unlock();
if (msg != NULL)
@@ -919,71 +899,38 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
rcu_read_unlock();
schedule();
- /* Lockless receive, part 1:
- * Disable preemption. We don't hold a reference to the queue
- * and getting a reference would defeat the idea of a lockless
- * operation, thus the code relies on rcu to guarantee the
- * existence of msq:
+ /*
+ * Lockless receive, part 1:
+ * We don't hold a reference to the queue and getting a
+ * reference would defeat the idea of a lockless operation,
+ * thus the code relies on rcu to guarantee the existence of
+ * msq:
* Prior to destruction, expunge_all(-EIRDM) changes r_msg.
* Thus if r_msg is -EAGAIN, then the queue not yet destroyed.
- * rcu_read_lock() prevents preemption between reading r_msg
- * and acquiring the q_perm.lock in ipc_lock_object().
*/
rcu_read_lock();
- /* Lockless receive, part 2:
- * Wait until pipelined_send or expunge_all are outside of
- * wake_up_process(). There is a race with exit(), see
- * ipc/mqueue.c for the details. The correct serialization
- * ensures that a receiver cannot continue without the wakeup
- * being visibible _before_ setting r_msg:
- *
- * CPU 0 CPU 1
- * <loop receiver>
- * smp_rmb(); (A) <-- pair -. <waker thread>
- * <load ->r_msg> | msr->r_msg = NULL;
- * | wake_up_process();
- * <continue> `------> smp_wmb(); (B)
- * msr->r_msg = msg;
+ /*
+ * Lockless receive, part 2:
+ * The work in pipelined_send() and expunge_all():
+ * - Set pointer to message
+ * - Queue the receiver task for later wakeup
+ * - Wake up the process after the lock is dropped.
*
- * Where (A) orders the message value read and where (B) orders
- * the write to the r_msg -- done in both pipelined_send and
- * expunge_all.
- */
- for (;;) {
- /*
- * Pairs with writer barrier in pipelined_send
- * or expunge_all.
- */
- smp_rmb(); /* barrier (A) */
- msg = (struct msg_msg *)msr_d.r_msg;
- if (msg)
- break;
-
- /*
- * The cpu_relax() call is a compiler barrier
- * which forces everything in this loop to be
- * re-loaded.
- */
- cpu_relax();
- }
-
- /* Lockless receive, part 3:
- * If there is a message or an error then accept it without
- * locking.
+ * Should the process wake up before this wakeup (due to a
+ * signal) it will either see the message and continue ...
*/
+ msg = READ_ONCE(msr_d.r_msg);
if (msg != ERR_PTR(-EAGAIN))
goto out_unlock1;
- /* Lockless receive, part 3:
- * Acquire the queue spinlock.
- */
+ /*
+ * ... or see -EAGAIN, acquire the lock to check the message
+ * again.
+ */
ipc_lock_object(&msq->q_perm);
- /* Lockless receive, part 4:
- * Repeat test after acquiring the spinlock.
- */
- msg = (struct msg_msg *)msr_d.r_msg;
+ msg = msr_d.r_msg;
if (msg != ERR_PTR(-EAGAIN))
goto out_unlock0;