summaryrefslogtreecommitdiff
path: root/fs
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2021-09-02 20:19:45 +0300
committerLinus Torvalds <torvalds@linux-foundation.org>2021-09-02 20:19:45 +0300
commit265113f70f3d63ae8b6eb1ce4303d14dbbd71b2d (patch)
tree354bd2e4d642fee66d3fe2351607e4f081bd48f1 /fs
parentb0cfcdd9b9672ea90642f33d6c0dd8516553adf2 (diff)
parentecd95673142ef80169a6c003b569b8a86d1e6329 (diff)
downloadlinux-265113f70f3d63ae8b6eb1ce4303d14dbbd71b2d.tar.xz
Merge tag 'dlm-5.15' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm
Pull dlm updates from David Teigland: "This set includes a number of minor fixes and cleanups related to the networking changes in the last release. A patch to delay ack messages reduces network traffic significantly" * tag 'dlm-5.15' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm: fs: dlm: avoid comms shutdown delay in release_lockspace fs: dlm: fix return -EINTR on recovery stopped fs: dlm: implement delayed ack handling fs: dlm: move receive loop into receive handler fs: dlm: fix multiple empty writequeue alloc fs: dlm: generic connect func fs: dlm: auto load sctp module fs: dlm: introduce generic listen fs: dlm: move to static proto ops fs: dlm: introduce con_next_wq helper fs: dlm: cleanup and remove _send_rcom fs: dlm: clear CF_APP_LIMITED on close fs: dlm: fix typo in tlv prefix fs: dlm: use READ_ONCE for config var fs: dlm: use sk->sk_socket instead of con->sock
Diffstat (limited to 'fs')
-rw-r--r--fs/dlm/dir.c4
-rw-r--r--fs/dlm/dlm_internal.h2
-rw-r--r--fs/dlm/lockspace.c3
-rw-r--r--fs/dlm/lowcomms.c770
-rw-r--r--fs/dlm/lowcomms.h1
-rw-r--r--fs/dlm/member.c4
-rw-r--r--fs/dlm/midcomms.c56
-rw-r--r--fs/dlm/rcom.c29
-rw-r--r--fs/dlm/recoverd.c4
9 files changed, 458 insertions, 415 deletions
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index 10c36ae1a8f9..45ebbe602bbf 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -85,8 +85,10 @@ int dlm_recover_directory(struct dlm_ls *ls)
for (;;) {
int left;
error = dlm_recovery_stopped(ls);
- if (error)
+ if (error) {
+ error = -EINTR;
goto out_free;
+ }
error = dlm_rcom_names(ls, memb->nodeid,
last_name, last_len);
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 91d1ca3a121a..5f57538b5d45 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -468,7 +468,7 @@ struct dlm_rcom {
struct dlm_opt_header {
uint16_t t_type;
uint16_t t_length;
- uint32_t o_pad;
+ uint32_t t_pad;
/* need to be 8 byte aligned */
char t_value[];
};
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index d71aba8c3e64..10eddfa6c3d7 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -498,7 +498,7 @@ static int new_lockspace(const char *name, const char *cluster,
ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
DLM_LSFL_NEWEXCL));
- size = dlm_config.ci_rsbtbl_size;
+ size = READ_ONCE(dlm_config.ci_rsbtbl_size);
ls->ls_rsbtbl_size = size;
ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
@@ -793,6 +793,7 @@ static int release_lockspace(struct dlm_ls *ls, int force)
if (ls_count == 1) {
dlm_scand_stop();
+ dlm_clear_members(ls);
dlm_midcomms_shutdown();
}
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 0ea9ae35da0b..8f715c620e1f 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -84,9 +84,7 @@ struct connection {
struct list_head writequeue; /* List of outgoing writequeue_entries */
spinlock_t writequeue_lock;
atomic_t writequeue_cnt;
- void (*connect_action) (struct connection *); /* What to do to connect */
- void (*shutdown_action)(struct connection *con); /* What to do to shutdown */
- bool (*eof_condition)(struct connection *con); /* What to do to eof check */
+ struct mutex wq_alloc;
int retries;
#define MAX_CONNECT_RETRIES 3
struct hlist_node list;
@@ -145,6 +143,24 @@ struct dlm_node_addr {
struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
};
+struct dlm_proto_ops {
+ bool try_new_addr;
+ const char *name;
+ int proto;
+
+ int (*connect)(struct connection *con, struct socket *sock,
+ struct sockaddr *addr, int addr_len);
+ void (*sockopts)(struct socket *sock);
+ int (*bind)(struct socket *sock);
+ int (*listen_validate)(void);
+ void (*listen_sockopts)(struct socket *sock);
+ int (*listen_bind)(struct socket *sock);
+ /* What to do to shutdown */
+ void (*shutdown_action)(struct connection *con);
+ /* What to do to eof check */
+ bool (*eof_condition)(struct connection *con);
+};
+
static struct listen_sock_callbacks {
void (*sk_error_report)(struct sock *);
void (*sk_data_ready)(struct sock *);
@@ -168,12 +184,26 @@ static struct hlist_head connection_hash[CONN_HASH_SIZE];
static DEFINE_SPINLOCK(connections_lock);
DEFINE_STATIC_SRCU(connections_srcu);
+static const struct dlm_proto_ops *dlm_proto_ops;
+
static void process_recv_sockets(struct work_struct *work);
static void process_send_sockets(struct work_struct *work);
-static void sctp_connect_to_sock(struct connection *con);
-static void tcp_connect_to_sock(struct connection *con);
-static void dlm_tcp_shutdown(struct connection *con);
+/* need to held writequeue_lock */
+static struct writequeue_entry *con_next_wq(struct connection *con)
+{
+ struct writequeue_entry *e;
+
+ if (list_empty(&con->writequeue))
+ return NULL;
+
+ e = list_first_entry(&con->writequeue, struct writequeue_entry,
+ list);
+ if (e->len == 0)
+ return NULL;
+
+ return e;
+}
static struct connection *__find_con(int nodeid, int r)
{
@@ -208,20 +238,6 @@ static int dlm_con_init(struct connection *con, int nodeid)
INIT_WORK(&con->rwork, process_recv_sockets);
init_waitqueue_head(&con->shutdown_wait);
- switch (dlm_config.ci_protocol) {
- case DLM_PROTO_TCP:
- con->connect_action = tcp_connect_to_sock;
- con->shutdown_action = dlm_tcp_shutdown;
- con->eof_condition = tcp_eof_condition;
- break;
- case DLM_PROTO_SCTP:
- con->connect_action = sctp_connect_to_sock;
- break;
- default:
- kfree(con->rx_buf);
- return -EINVAL;
- }
-
return 0;
}
@@ -249,6 +265,8 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
return NULL;
}
+ mutex_init(&con->wq_alloc);
+
spin_lock(&connections_lock);
/* Because multiple workqueues/threads calls this function it can
* race on multiple cpu's. Instead of locking hot path __find_con()
@@ -583,8 +601,7 @@ static void lowcomms_error_report(struct sock *sk)
goto out;
orig_report = listen_sock.sk_error_report;
- if (con->sock == NULL ||
- kernel_getpeername(con->sock, (struct sockaddr *)&saddr) < 0) {
+ if (kernel_getpeername(sk->sk_socket, (struct sockaddr *)&saddr) < 0) {
printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
"sending to node %d, port %d, "
"sk_err=%d/%d\n", dlm_our_nodeid(),
@@ -801,6 +818,7 @@ static void close_connection(struct connection *con, bool and_other,
con->rx_leftover = 0;
con->retries = 0;
+ clear_bit(CF_APP_LIMITED, &con->flags);
clear_bit(CF_CONNECTED, &con->flags);
clear_bit(CF_DELAY_CONNECT, &con->flags);
clear_bit(CF_RECONNECT, &con->flags);
@@ -877,7 +895,6 @@ static int con_realloc_receive_buf(struct connection *con, int newlen)
/* Data received from remote end */
static int receive_from_sock(struct connection *con)
{
- int call_again_soon = 0;
struct msghdr msg;
struct kvec iov;
int ret, buflen;
@@ -897,41 +914,40 @@ static int receive_from_sock(struct connection *con)
goto out_resched;
}
- /* calculate new buffer parameter regarding last receive and
- * possible leftover bytes
- */
- iov.iov_base = con->rx_buf + con->rx_leftover;
- iov.iov_len = con->rx_buflen - con->rx_leftover;
-
- memset(&msg, 0, sizeof(msg));
- msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
- ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
- msg.msg_flags);
- if (ret <= 0)
- goto out_close;
- else if (ret == iov.iov_len)
- call_again_soon = 1;
-
- /* new buflen according readed bytes and leftover from last receive */
- buflen = ret + con->rx_leftover;
- ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen);
- if (ret < 0)
- goto out_close;
+ for (;;) {
+ /* calculate new buffer parameter regarding last receive and
+ * possible leftover bytes
+ */
+ iov.iov_base = con->rx_buf + con->rx_leftover;
+ iov.iov_len = con->rx_buflen - con->rx_leftover;
+
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
+ ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
+ msg.msg_flags);
+ if (ret == -EAGAIN)
+ break;
+ else if (ret <= 0)
+ goto out_close;
- /* calculate leftover bytes from process and put it into begin of
- * the receive buffer, so next receive we have the full message
- * at the start address of the receive buffer.
- */
- con->rx_leftover = buflen - ret;
- if (con->rx_leftover) {
- memmove(con->rx_buf, con->rx_buf + ret,
- con->rx_leftover);
- call_again_soon = true;
+ /* new buflen according readed bytes and leftover from last receive */
+ buflen = ret + con->rx_leftover;
+ ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen);
+ if (ret < 0)
+ goto out_close;
+
+ /* calculate leftover bytes from process and put it into begin of
+ * the receive buffer, so next receive we have the full message
+ * at the start address of the receive buffer.
+ */
+ con->rx_leftover = buflen - ret;
+ if (con->rx_leftover) {
+ memmove(con->rx_buf, con->rx_buf + ret,
+ con->rx_leftover);
+ }
}
- if (call_again_soon)
- goto out_resched;
-
+ dlm_midcomms_receive_done(con->nodeid);
mutex_unlock(&con->sock_mutex);
return 0;
@@ -946,7 +962,8 @@ out_close:
log_print("connection %p got EOF from %d",
con, con->nodeid);
- if (con->eof_condition && con->eof_condition(con)) {
+ if (dlm_proto_ops->eof_condition &&
+ dlm_proto_ops->eof_condition(con)) {
set_bit(CF_EOF, &con->flags);
mutex_unlock(&con->sock_mutex);
} else {
@@ -1134,242 +1151,6 @@ static int sctp_bind_addrs(struct socket *sock, uint16_t port)
return result;
}
-/* Initiate an SCTP association.
- This is a special case of send_to_sock() in that we don't yet have a
- peeled-off socket for this association, so we use the listening socket
- and add the primary IP address of the remote node.
- */
-static void sctp_connect_to_sock(struct connection *con)
-{
- struct sockaddr_storage daddr;
- int result;
- int addr_len;
- struct socket *sock;
- unsigned int mark;
-
- mutex_lock(&con->sock_mutex);
-
- /* Some odd races can cause double-connects, ignore them */
- if (con->retries++ > MAX_CONNECT_RETRIES)
- goto out;
-
- if (con->sock) {
- log_print("node %d already connected.", con->nodeid);
- goto out;
- }
-
- memset(&daddr, 0, sizeof(daddr));
- result = nodeid_to_addr(con->nodeid, &daddr, NULL, true, &mark);
- if (result < 0) {
- log_print("no address for nodeid %d", con->nodeid);
- goto out;
- }
-
- /* Create a socket to communicate with */
- result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
- SOCK_STREAM, IPPROTO_SCTP, &sock);
- if (result < 0)
- goto socket_err;
-
- sock_set_mark(sock->sk, mark);
-
- add_sock(sock, con);
-
- /* Bind to all addresses. */
- if (sctp_bind_addrs(con->sock, 0))
- goto bind_err;
-
- make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len);
-
- log_print_ratelimited("connecting to %d", con->nodeid);
-
- /* Turn off Nagle's algorithm */
- sctp_sock_set_nodelay(sock->sk);
-
- /*
- * Make sock->ops->connect() function return in specified time,
- * since O_NONBLOCK argument in connect() function does not work here,
- * then, we should restore the default value of this attribute.
- */
- sock_set_sndtimeo(sock->sk, 5);
- result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len,
- 0);
- sock_set_sndtimeo(sock->sk, 0);
-
- if (result == -EINPROGRESS)
- result = 0;
- if (result == 0) {
- if (!test_and_set_bit(CF_CONNECTED, &con->flags))
- log_print("successful connected to node %d", con->nodeid);
- goto out;
- }
-
-bind_err:
- con->sock = NULL;
- sock_release(sock);
-
-socket_err:
- /*
- * Some errors are fatal and this list might need adjusting. For other
- * errors we try again until the max number of retries is reached.
- */
- if (result != -EHOSTUNREACH &&
- result != -ENETUNREACH &&
- result != -ENETDOWN &&
- result != -EINVAL &&
- result != -EPROTONOSUPPORT) {
- log_print("connect %d try %d error %d", con->nodeid,
- con->retries, result);
- mutex_unlock(&con->sock_mutex);
- msleep(1000);
- lowcomms_connect_sock(con);
- return;
- }
-
-out:
- mutex_unlock(&con->sock_mutex);
-}
-
-/* Connect a new socket to its peer */
-static void tcp_connect_to_sock(struct connection *con)
-{
- struct sockaddr_storage saddr, src_addr;
- unsigned int mark;
- int addr_len;
- struct socket *sock = NULL;
- int result;
-
- mutex_lock(&con->sock_mutex);
- if (con->retries++ > MAX_CONNECT_RETRIES)
- goto out;
-
- /* Some odd races can cause double-connects, ignore them */
- if (con->sock)
- goto out;
-
- /* Create a socket to communicate with */
- result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
- SOCK_STREAM, IPPROTO_TCP, &sock);
- if (result < 0)
- goto out_err;
-
- memset(&saddr, 0, sizeof(saddr));
- result = nodeid_to_addr(con->nodeid, &saddr, NULL, false, &mark);
- if (result < 0) {
- log_print("no address for nodeid %d", con->nodeid);
- goto out_err;
- }
-
- sock_set_mark(sock->sk, mark);
-
- add_sock(sock, con);
-
- /* Bind to our cluster-known address connecting to avoid
- routing problems */
- memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
- make_sockaddr(&src_addr, 0, &addr_len);
- result = sock->ops->bind(sock, (struct sockaddr *) &src_addr,
- addr_len);
- if (result < 0) {
- log_print("could not bind for connect: %d", result);
- /* This *may* not indicate a critical error */
- }
-
- make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
-
- log_print_ratelimited("connecting to %d", con->nodeid);
-
- /* Turn off Nagle's algorithm */
- tcp_sock_set_nodelay(sock->sk);
-
- result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
- O_NONBLOCK);
- if (result == -EINPROGRESS)
- result = 0;
- if (result == 0)
- goto out;
-
-out_err:
- if (con->sock) {
- sock_release(con->sock);
- con->sock = NULL;
- } else if (sock) {
- sock_release(sock);
- }
- /*
- * Some errors are fatal and this list might need adjusting. For other
- * errors we try again until the max number of retries is reached.
- */
- if (result != -EHOSTUNREACH &&
- result != -ENETUNREACH &&
- result != -ENETDOWN &&
- result != -EINVAL &&
- result != -EPROTONOSUPPORT) {
- log_print("connect %d try %d error %d", con->nodeid,
- con->retries, result);
- mutex_unlock(&con->sock_mutex);
- msleep(1000);
- lowcomms_connect_sock(con);
- return;
- }
-out:
- mutex_unlock(&con->sock_mutex);
- return;
-}
-
-/* On error caller must run dlm_close_sock() for the
- * listen connection socket.
- */
-static int tcp_create_listen_sock(struct listen_connection *con,
- struct sockaddr_storage *saddr)
-{
- struct socket *sock = NULL;
- int result = 0;
- int addr_len;
-
- if (dlm_local_addr[0]->ss_family == AF_INET)
- addr_len = sizeof(struct sockaddr_in);
- else
- addr_len = sizeof(struct sockaddr_in6);
-
- /* Create a socket to communicate with */
- result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
- SOCK_STREAM, IPPROTO_TCP, &sock);
- if (result < 0) {
- log_print("Can't create listening comms socket");
- goto create_out;
- }
-
- sock_set_mark(sock->sk, dlm_config.ci_mark);
-
- /* Turn off Nagle's algorithm */
- tcp_sock_set_nodelay(sock->sk);
-
- sock_set_reuseaddr(sock->sk);
-
- add_listen_sock(sock, con);
-
- /* Bind to our port */
- make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
- result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
- if (result < 0) {
- log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
- goto create_out;
- }
- sock_set_keepalive(sock->sk);
-
- result = sock->ops->listen(sock, 5);
- if (result < 0) {
- log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
- goto create_out;
- }
-
- return 0;
-
-create_out:
- return result;
-}
-
/* Get local addresses */
static void init_local(void)
{
@@ -1396,63 +1177,6 @@ static void deinit_local(void)
kfree(dlm_local_addr[i]);
}
-/* Initialise SCTP socket and bind to all interfaces
- * On error caller must run dlm_close_sock() for the
- * listen connection socket.
- */
-static int sctp_listen_for_all(struct listen_connection *con)
-{
- struct socket *sock = NULL;
- int result = -EINVAL;
-
- log_print("Using SCTP for communications");
-
- result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
- SOCK_STREAM, IPPROTO_SCTP, &sock);
- if (result < 0) {
- log_print("Can't create comms socket, check SCTP is loaded");
- goto out;
- }
-
- sock_set_rcvbuf(sock->sk, NEEDED_RMEM);
- sock_set_mark(sock->sk, dlm_config.ci_mark);
- sctp_sock_set_nodelay(sock->sk);
-
- add_listen_sock(sock, con);
-
- /* Bind to all addresses. */
- result = sctp_bind_addrs(con->sock, dlm_config.ci_tcp_port);
- if (result < 0)
- goto out;
-
- result = sock->ops->listen(sock, 5);
- if (result < 0) {
- log_print("Can't set socket listening");
- goto out;
- }
-
- return 0;
-
-out:
- return result;
-}
-
-static int tcp_listen_for_all(void)
-{
- /* We don't support multi-homed hosts */
- if (dlm_local_count > 1) {
- log_print("TCP protocol can't handle multi-homed hosts, "
- "try SCTP");
- return -EINVAL;
- }
-
- log_print("Using TCP for communications");
-
- return tcp_create_listen_sock(&listen_con, dlm_local_addr[0]);
-}
-
-
-
static struct writequeue_entry *new_writequeue_entry(struct connection *con,
gfp_t allocation)
{
@@ -1528,19 +1252,37 @@ static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
{
struct writequeue_entry *e;
struct dlm_msg *msg;
+ bool sleepable;
msg = kzalloc(sizeof(*msg), allocation);
if (!msg)
return NULL;
+ /* this mutex is being used as a wait to avoid multiple "fast"
+ * new writequeue page list entry allocs in new_wq_entry in
+ * normal operation which is sleepable context. Without it
+ * we could end in multiple writequeue entries with one
+ * dlm message because multiple callers were waiting at
+ * the writequeue_lock in new_wq_entry().
+ */
+ sleepable = gfpflags_normal_context(allocation);
+ if (sleepable)
+ mutex_lock(&con->wq_alloc);
+
kref_init(&msg->ref);
e = new_wq_entry(con, len, allocation, ppc, cb, mh);
if (!e) {
+ if (sleepable)
+ mutex_unlock(&con->wq_alloc);
+
kfree(msg);
return NULL;
}
+ if (sleepable)
+ mutex_unlock(&con->wq_alloc);
+
msg->ppc = *ppc;
msg->len = len;
msg->entry = e;
@@ -1646,10 +1388,9 @@ int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
/* Send a message */
static void send_to_sock(struct connection *con)
{
- int ret = 0;
const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
struct writequeue_entry *e;
- int len, offset;
+ int len, offset, ret;
int count = 0;
mutex_lock(&con->sock_mutex);
@@ -1658,7 +1399,8 @@ static void send_to_sock(struct connection *con)
spin_lock(&con->writequeue_lock);
for (;;) {
- if (list_empty(&con->writequeue))
+ e = con_next_wq(con);
+ if (!e)
break;
e = list_first_entry(&con->writequeue, struct writequeue_entry, list);
@@ -1667,25 +1409,22 @@ static void send_to_sock(struct connection *con)
BUG_ON(len == 0 && e->users == 0);
spin_unlock(&con->writequeue_lock);
- ret = 0;
- if (len) {
- ret = kernel_sendpage(con->sock, e->page, offset, len,
- msg_flags);
- if (ret == -EAGAIN || ret == 0) {
- if (ret == -EAGAIN &&
- test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
- !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
- /* Notify TCP that we're limited by the
- * application window size.
- */
- set_bit(SOCK_NOSPACE, &con->sock->flags);
- con->sock->sk->sk_write_pending++;
- }
- cond_resched();
- goto out;
- } else if (ret < 0)
- goto out;
- }
+ ret = kernel_sendpage(con->sock, e->page, offset, len,
+ msg_flags);
+ if (ret == -EAGAIN || ret == 0) {
+ if (ret == -EAGAIN &&
+ test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
+ !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
+ /* Notify TCP that we're limited by the
+ * application window size.
+ */
+ set_bit(SOCK_NOSPACE, &con->sock->flags);
+ con->sock->sk->sk_write_pending++;
+ }
+ cond_resched();
+ goto out;
+ } else if (ret < 0)
+ goto out;
/* Don't starve people filling buffers */
if (++count >= MAX_SEND_MSG_COUNT) {
@@ -1770,12 +1509,9 @@ int dlm_lowcomms_close(int nodeid)
static void process_recv_sockets(struct work_struct *work)
{
struct connection *con = container_of(work, struct connection, rwork);
- int err;
clear_bit(CF_READ_PENDING, &con->flags);
- do {
- err = receive_from_sock(con);
- } while (!err);
+ receive_from_sock(con);
}
static void process_listen_recv_socket(struct work_struct *work)
@@ -1783,6 +1519,74 @@ static void process_listen_recv_socket(struct work_struct *work)
accept_from_sock(&listen_con);
}
+static void dlm_connect(struct connection *con)
+{
+ struct sockaddr_storage addr;
+ int result, addr_len;
+ struct socket *sock;
+ unsigned int mark;
+
+ /* Some odd races can cause double-connects, ignore them */
+ if (con->retries++ > MAX_CONNECT_RETRIES)
+ return;
+
+ if (con->sock) {
+ log_print("node %d already connected.", con->nodeid);
+ return;
+ }
+
+ memset(&addr, 0, sizeof(addr));
+ result = nodeid_to_addr(con->nodeid, &addr, NULL,
+ dlm_proto_ops->try_new_addr, &mark);
+ if (result < 0) {
+ log_print("no address for nodeid %d", con->nodeid);
+ return;
+ }
+
+ /* Create a socket to communicate with */
+ result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
+ SOCK_STREAM, dlm_proto_ops->proto, &sock);
+ if (result < 0)
+ goto socket_err;
+
+ sock_set_mark(sock->sk, mark);
+ dlm_proto_ops->sockopts(sock);
+
+ add_sock(sock, con);
+
+ result = dlm_proto_ops->bind(sock);
+ if (result < 0)
+ goto add_sock_err;
+
+ log_print_ratelimited("connecting to %d", con->nodeid);
+ make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len);
+ result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr,
+ addr_len);
+ if (result < 0)
+ goto add_sock_err;
+
+ return;
+
+add_sock_err:
+ dlm_close_sock(&con->sock);
+
+socket_err:
+ /*
+ * Some errors are fatal and this list might need adjusting. For other
+ * errors we try again until the max number of retries is reached.
+ */
+ if (result != -EHOSTUNREACH &&
+ result != -ENETUNREACH &&
+ result != -ENETDOWN &&
+ result != -EINVAL &&
+ result != -EPROTONOSUPPORT) {
+ log_print("connect %d try %d error %d", con->nodeid,
+ con->retries, result);
+ msleep(1000);
+ lowcomms_connect_sock(con);
+ }
+}
+
/* Send workqueue function */
static void process_send_sockets(struct work_struct *work)
{
@@ -1797,11 +1601,15 @@ static void process_send_sockets(struct work_struct *work)
dlm_midcomms_unack_msg_resend(con->nodeid);
}
- if (con->sock == NULL) { /* not mutex protected so check it inside too */
+ if (con->sock == NULL) {
if (test_and_clear_bit(CF_DELAY_CONNECT, &con->flags))
msleep(1000);
- con->connect_action(con);
+
+ mutex_lock(&con->sock_mutex);
+ dlm_connect(con);
+ mutex_unlock(&con->sock_mutex);
}
+
if (!list_empty(&con->writequeue))
send_to_sock(con);
}
@@ -1840,8 +1648,8 @@ static int work_start(void)
static void shutdown_conn(struct connection *con)
{
- if (con->shutdown_action)
- con->shutdown_action(con);
+ if (dlm_proto_ops->shutdown_action)
+ dlm_proto_ops->shutdown_action(con);
}
void dlm_lowcomms_shutdown(void)
@@ -1948,8 +1756,198 @@ void dlm_lowcomms_stop(void)
srcu_read_unlock(&connections_srcu, idx);
work_stop();
deinit_local();
+
+ dlm_proto_ops = NULL;
}
+static int dlm_listen_for_all(void)
+{
+ struct socket *sock;
+ int result;
+
+ log_print("Using %s for communications",
+ dlm_proto_ops->name);
+
+ result = dlm_proto_ops->listen_validate();
+ if (result < 0)
+ return result;
+
+ result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
+ SOCK_STREAM, dlm_proto_ops->proto, &sock);
+ if (result < 0) {
+ log_print("Can't create comms socket, check SCTP is loaded");
+ goto out;
+ }
+
+ sock_set_mark(sock->sk, dlm_config.ci_mark);
+ dlm_proto_ops->listen_sockopts(sock);
+
+ result = dlm_proto_ops->listen_bind(sock);
+ if (result < 0)
+ goto out;
+
+ save_listen_callbacks(sock);
+ add_listen_sock(sock, &listen_con);
+
+ INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
+ result = sock->ops->listen(sock, 5);
+ if (result < 0) {
+ dlm_close_sock(&listen_con.sock);
+ goto out;
+ }
+
+ return 0;
+
+out:
+ sock_release(sock);
+ return result;
+}
+
+static int dlm_tcp_bind(struct socket *sock)
+{
+ struct sockaddr_storage src_addr;
+ int result, addr_len;
+
+ /* Bind to our cluster-known address connecting to avoid
+ * routing problems.
+ */
+ memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
+ make_sockaddr(&src_addr, 0, &addr_len);
+
+ result = sock->ops->bind(sock, (struct sockaddr *)&src_addr,
+ addr_len);
+ if (result < 0) {
+ /* This *may* not indicate a critical error */
+ log_print("could not bind for connect: %d", result);
+ }
+
+ return 0;
+}
+
+static int dlm_tcp_connect(struct connection *con, struct socket *sock,
+ struct sockaddr *addr, int addr_len)
+{
+ int ret;
+
+ ret = sock->ops->connect(sock, addr, addr_len, O_NONBLOCK);
+ switch (ret) {
+ case -EINPROGRESS:
+ fallthrough;
+ case 0:
+ return 0;
+ }
+
+ return ret;
+}
+
+static int dlm_tcp_listen_validate(void)
+{
+ /* We don't support multi-homed hosts */
+ if (dlm_local_count > 1) {
+ log_print("TCP protocol can't handle multi-homed hosts, try SCTP");
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+static void dlm_tcp_sockopts(struct socket *sock)
+{
+ /* Turn off Nagle's algorithm */
+ tcp_sock_set_nodelay(sock->sk);
+}
+
+static void dlm_tcp_listen_sockopts(struct socket *sock)
+{
+ dlm_tcp_sockopts(sock);
+ sock_set_reuseaddr(sock->sk);
+}
+
+static int dlm_tcp_listen_bind(struct socket *sock)
+{
+ int addr_len;
+
+ /* Bind to our port */
+ make_sockaddr(dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len);
+ return sock->ops->bind(sock, (struct sockaddr *)dlm_local_addr[0],
+ addr_len);
+}
+
+static const struct dlm_proto_ops dlm_tcp_ops = {
+ .name = "TCP",
+ .proto = IPPROTO_TCP,
+ .connect = dlm_tcp_connect,
+ .sockopts = dlm_tcp_sockopts,
+ .bind = dlm_tcp_bind,
+ .listen_validate = dlm_tcp_listen_validate,
+ .listen_sockopts = dlm_tcp_listen_sockopts,
+ .listen_bind = dlm_tcp_listen_bind,
+ .shutdown_action = dlm_tcp_shutdown,
+ .eof_condition = tcp_eof_condition,
+};
+
+static int dlm_sctp_bind(struct socket *sock)
+{
+ return sctp_bind_addrs(sock, 0);
+}
+
+static int dlm_sctp_connect(struct connection *con, struct socket *sock,
+ struct sockaddr *addr, int addr_len)
+{
+ int ret;
+
+ /*
+ * Make sock->ops->connect() function return in specified time,
+ * since O_NONBLOCK argument in connect() function does not work here,
+ * then, we should restore the default value of this attribute.
+ */
+ sock_set_sndtimeo(sock->sk, 5);
+ ret = sock->ops->connect(sock, addr, addr_len, 0);
+ sock_set_sndtimeo(sock->sk, 0);
+ if (ret < 0)
+ return ret;
+
+ if (!test_and_set_bit(CF_CONNECTED, &con->flags))
+ log_print("successful connected to node %d", con->nodeid);
+
+ return 0;
+}
+
+static int dlm_sctp_listen_validate(void)
+{
+ if (!IS_ENABLED(CONFIG_IP_SCTP)) {
+ log_print("SCTP is not enabled by this kernel");
+ return -EOPNOTSUPP;
+ }
+
+ request_module("sctp");
+ return 0;
+}
+
+static int dlm_sctp_bind_listen(struct socket *sock)
+{
+ return sctp_bind_addrs(sock, dlm_config.ci_tcp_port);
+}
+
+static void dlm_sctp_sockopts(struct socket *sock)
+{
+ /* Turn off Nagle's algorithm */
+ sctp_sock_set_nodelay(sock->sk);
+ sock_set_rcvbuf(sock->sk, NEEDED_RMEM);
+}
+
+static const struct dlm_proto_ops dlm_sctp_ops = {
+ .name = "SCTP",
+ .proto = IPPROTO_SCTP,
+ .try_new_addr = true,
+ .connect = dlm_sctp_connect,
+ .sockopts = dlm_sctp_sockopts,
+ .bind = dlm_sctp_bind,
+ .listen_validate = dlm_sctp_listen_validate,
+ .listen_sockopts = dlm_sctp_sockopts,
+ .listen_bind = dlm_sctp_bind_listen,
+};
+
int dlm_lowcomms_start(void)
{
int error = -EINVAL;
@@ -1976,23 +1974,27 @@ int dlm_lowcomms_start(void)
/* Start listening */
switch (dlm_config.ci_protocol) {
case DLM_PROTO_TCP:
- error = tcp_listen_for_all();
+ dlm_proto_ops = &dlm_tcp_ops;
break;
case DLM_PROTO_SCTP:
- error = sctp_listen_for_all(&listen_con);
+ dlm_proto_ops = &dlm_sctp_ops;
break;
default:
log_print("Invalid protocol identifier %d set",
dlm_config.ci_protocol);
error = -EINVAL;
- break;
+ goto fail_proto_ops;
}
+
+ error = dlm_listen_for_all();
if (error)
- goto fail_unlisten;
+ goto fail_listen;
return 0;
-fail_unlisten:
+fail_listen:
+ dlm_proto_ops = NULL;
+fail_proto_ops:
dlm_allow_conn = 0;
dlm_close_sock(&listen_con.sock);
work_stop();
diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h
index aaae7115c00d..4ccae07cf005 100644
--- a/fs/dlm/lowcomms.h
+++ b/fs/dlm/lowcomms.h
@@ -46,6 +46,7 @@ int dlm_lowcomms_resend_msg(struct dlm_msg *msg);
int dlm_lowcomms_connect_node(int nodeid);
int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark);
int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len);
+void dlm_midcomms_receive_done(int nodeid);
#endif /* __LOWCOMMS_DOT_H__ */
diff --git a/fs/dlm/member.c b/fs/dlm/member.c
index d9e1e4170eb1..731d489aa323 100644
--- a/fs/dlm/member.c
+++ b/fs/dlm/member.c
@@ -443,8 +443,10 @@ static int ping_members(struct dlm_ls *ls)
list_for_each_entry(memb, &ls->ls_nodes, list) {
error = dlm_recovery_stopped(ls);
- if (error)
+ if (error) {
+ error = -EINTR;
break;
+ }
error = dlm_rcom_status(ls, memb->nodeid, 0);
if (error)
break;
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index e3de268898ed..7ae39ec8d9b0 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -109,12 +109,6 @@
* compatibility. There exists better ways to make a better handling.
* However this should be changed in the next major version bump of dlm.
*
- * Ack handling:
- *
- * Currently we send an ack message for every dlm message. However we
- * can ack multiple dlm messages with one ack by just delaying the ack
- * message. Will reduce some traffic but makes the drop detection slower.
- *
* Tail Size checking:
*
* There exists a message tail payload in e.g. DLM_MSG however we don't
@@ -169,6 +163,7 @@ struct midcomms_node {
#define DLM_NODE_FLAG_CLOSE 1
#define DLM_NODE_FLAG_STOP_TX 2
#define DLM_NODE_FLAG_STOP_RX 3
+#define DLM_NODE_ULP_DELIVERED 4
unsigned long flags;
wait_queue_head_t shutdown_wait;
@@ -480,11 +475,12 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
{
if (seq == node->seq_next) {
node->seq_next++;
- /* send ack before fin */
- dlm_send_ack(node->nodeid, node->seq_next);
switch (p->header.h_cmd) {
case DLM_FIN:
+ /* send ack before fin */
+ dlm_send_ack(node->nodeid, node->seq_next);
+
spin_lock(&node->state_lock);
pr_debug("receive fin msg from node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
@@ -534,6 +530,7 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
default:
WARN_ON(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
dlm_receive_buffer(p, node->nodeid);
+ set_bit(DLM_NODE_ULP_DELIVERED, &node->flags);
break;
}
} else {
@@ -933,6 +930,49 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
return ret;
}
+void dlm_midcomms_receive_done(int nodeid)
+{
+ struct midcomms_node *node;
+ int idx;
+
+ idx = srcu_read_lock(&nodes_srcu);
+ node = nodeid2node(nodeid, 0);
+ if (!node) {
+ srcu_read_unlock(&nodes_srcu, idx);
+ return;
+ }
+
+ /* old protocol, we do nothing */
+ switch (node->version) {
+ case DLM_VERSION_3_2:
+ break;
+ default:
+ srcu_read_unlock(&nodes_srcu, idx);
+ return;
+ }
+
+ /* do nothing if we didn't delivered stateful to ulp */
+ if (!test_and_clear_bit(DLM_NODE_ULP_DELIVERED,
+ &node->flags)) {
+ srcu_read_unlock(&nodes_srcu, idx);
+ return;
+ }
+
+ spin_lock(&node->state_lock);
+ /* we only ack if state is ESTABLISHED */
+ switch (node->state) {
+ case DLM_ESTABLISHED:
+ spin_unlock(&node->state_lock);
+ dlm_send_ack(node->nodeid, node->seq_next);
+ break;
+ default:
+ spin_unlock(&node->state_lock);
+ /* do nothing FIN has it's own ack send */
+ break;
+ };
+ srcu_read_unlock(&nodes_srcu, idx);
+}
+
void dlm_midcomms_unack_msg_resend(int nodeid)
{
struct midcomms_node *node;
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 5651933f54a4..6cba86470278 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -89,22 +89,15 @@ static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type,
return 0;
}
-static void _send_rcom(struct dlm_ls *ls, struct dlm_rcom *rc)
+static void send_rcom(struct dlm_mhandle *mh, struct dlm_rcom *rc)
{
dlm_rcom_out(rc);
-}
-
-static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
- struct dlm_rcom *rc)
-{
- _send_rcom(ls, rc);
dlm_midcomms_commit_mhandle(mh);
}
-static void send_rcom_stateless(struct dlm_ls *ls, struct dlm_msg *msg,
- struct dlm_rcom *rc)
+static void send_rcom_stateless(struct dlm_msg *msg, struct dlm_rcom *rc)
{
- _send_rcom(ls, rc);
+ dlm_rcom_out(rc);
dlm_lowcomms_commit_msg(msg);
dlm_lowcomms_put_msg(msg);
}
@@ -204,7 +197,7 @@ retry:
allow_sync_reply(ls, &rc->rc_id);
memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
- send_rcom_stateless(ls, msg, rc);
+ send_rcom_stateless(msg, rc);
error = dlm_wait_function(ls, &rcom_response);
disallow_sync_reply(ls);
@@ -287,7 +280,7 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
spin_unlock(&ls->ls_recover_lock);
do_send:
- send_rcom_stateless(ls, msg, rc);
+ send_rcom_stateless(msg, rc);
}
static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
@@ -327,7 +320,7 @@ retry:
allow_sync_reply(ls, &rc->rc_id);
memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
- send_rcom_stateless(ls, msg, rc);
+ send_rcom_stateless(msg, rc);
error = dlm_wait_function(ls, &rcom_response);
disallow_sync_reply(ls);
@@ -356,7 +349,7 @@ static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
nodeid);
- send_rcom_stateless(ls, msg, rc);
+ send_rcom_stateless(msg, rc);
}
int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
@@ -373,7 +366,7 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
memcpy(rc->rc_buf, r->res_name, r->res_length);
rc->rc_id = (unsigned long) r->res_id;
- send_rcom(ls, mh, rc);
+ send_rcom(mh, rc);
out:
return error;
}
@@ -404,7 +397,7 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
rc->rc_id = rc_in->rc_id;
rc->rc_seq_reply = rc_in->rc_seq;
- send_rcom(ls, mh, rc);
+ send_rcom(mh, rc);
}
static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
@@ -461,7 +454,7 @@ int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
pack_rcom_lock(r, lkb, rl);
rc->rc_id = (unsigned long) r;
- send_rcom(ls, mh, rc);
+ send_rcom(mh, rc);
out:
return error;
}
@@ -487,7 +480,7 @@ static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
rc->rc_id = rc_in->rc_id;
rc->rc_seq_reply = rc_in->rc_seq;
- send_rcom(ls, mh, rc);
+ send_rcom(mh, rc);
}
/* If the lockspace doesn't exist then still send a status message
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 85e245392715..97d052cea5a9 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -125,8 +125,10 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
dlm_recover_waiters_pre(ls);
error = dlm_recovery_stopped(ls);
- if (error)
+ if (error) {
+ error = -EINTR;
goto fail;
+ }
if (neg || dlm_no_directory(ls)) {
/*