diff options
Diffstat (limited to 'fs/dlm/lowcomms-sctp.c')
-rw-r--r-- | fs/dlm/lowcomms-sctp.c | 151 |
1 files changed, 67 insertions, 84 deletions
diff --git a/fs/dlm/lowcomms-sctp.c b/fs/dlm/lowcomms-sctp.c index fe158d7a9285..dc83a9d979b5 100644 --- a/fs/dlm/lowcomms-sctp.c +++ b/fs/dlm/lowcomms-sctp.c @@ -72,6 +72,8 @@ struct nodeinfo { struct list_head writequeue; /* outgoing writequeue_entries */ spinlock_t writequeue_lock; int nodeid; + struct work_struct swork; /* Send workqueue */ + struct work_struct lwork; /* Locking workqueue */ }; static DEFINE_IDR(nodeinfo_idr); @@ -96,6 +98,7 @@ struct connection { atomic_t waiting_requests; struct cbuf cb; int eagain_flag; + struct work_struct work; /* Send workqueue */ }; /* An entry waiting to be sent */ @@ -137,19 +140,23 @@ static void cbuf_eat(struct cbuf *cb, int n) static LIST_HEAD(write_nodes); static DEFINE_SPINLOCK(write_nodes_lock); + /* Maximum number of incoming messages to process before * doing a schedule() */ #define MAX_RX_MSG_COUNT 25 -/* Manage daemons */ -static struct task_struct *recv_task; -static struct task_struct *send_task; -static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_wait); +/* Work queues */ +static struct workqueue_struct *recv_workqueue; +static struct workqueue_struct *send_workqueue; +static struct workqueue_struct *lock_workqueue; /* The SCTP connection */ static struct connection sctp_con; +static void process_send_sockets(struct work_struct *work); +static void process_recv_sockets(struct work_struct *work); +static void process_lock_request(struct work_struct *work); static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr) { @@ -222,6 +229,8 @@ static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc) spin_lock_init(&ni->lock); INIT_LIST_HEAD(&ni->writequeue); spin_lock_init(&ni->writequeue_lock); + INIT_WORK(&ni->lwork, process_lock_request); + INIT_WORK(&ni->swork, process_send_sockets); ni->nodeid = nodeid; if (nodeid > max_nodeid) @@ -249,11 +258,8 @@ static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc) /* Data or notification available on socket */ static void lowcomms_data_ready(struct sock *sk, int count_unused) { - atomic_inc(&sctp_con.waiting_requests); if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags)) - return; - - wake_up_interruptible(&lowcomms_recv_wait); + queue_work(recv_workqueue, &sctp_con.work); } @@ -361,10 +367,10 @@ static void init_failed(void) spin_lock_bh(&write_nodes_lock); list_add_tail(&ni->write_list, &write_nodes); spin_unlock_bh(&write_nodes_lock); + queue_work(send_workqueue, &ni->swork); } } } - wake_up_process(send_task); } /* Something happened to an association */ @@ -446,8 +452,8 @@ static void process_sctp_notification(struct msghdr *msg, char *buf) spin_lock_bh(&write_nodes_lock); list_add_tail(&ni->write_list, &write_nodes); spin_unlock_bh(&write_nodes_lock); + queue_work(send_workqueue, &ni->swork); } - wake_up_process(send_task); } break; @@ -580,8 +586,8 @@ static int receive_from_sock(void) spin_lock_bh(&write_nodes_lock); list_add_tail(&ni->write_list, &write_nodes); spin_unlock_bh(&write_nodes_lock); + queue_work(send_workqueue, &ni->swork); } - wake_up_process(send_task); } } @@ -590,6 +596,7 @@ static int receive_from_sock(void) return 0; cbuf_add(&sctp_con.cb, ret); + // PJC: TODO: Add to node's workqueue....can we ?? ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid), page_address(sctp_con.rx_page), sctp_con.cb.base, sctp_con.cb.len, @@ -635,7 +642,7 @@ static int add_bind_addr(struct sockaddr_storage *addr, int addr_len, int num) if (result < 0) log_print("Can't bind to port %d addr number %d", - dlm_config.tcp_port, num); + dlm_config.ci_tcp_port, num); return result; } @@ -711,7 +718,7 @@ static int init_sock(void) /* Bind to all interfaces. */ for (i = 0; i < dlm_local_count; i++) { memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); - make_sockaddr(&localaddr, dlm_config.tcp_port, &addr_len); + make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len); result = add_bind_addr(&localaddr, addr_len, num); if (result) @@ -820,7 +827,8 @@ void dlm_lowcomms_commit_buffer(void *arg) spin_lock_bh(&write_nodes_lock); list_add_tail(&ni->write_list, &write_nodes); spin_unlock_bh(&write_nodes_lock); - wake_up_process(send_task); + + queue_work(send_workqueue, &ni->swork); } return; @@ -863,7 +871,7 @@ static void initiate_association(int nodeid) return; } - make_sockaddr(&rem_addr, dlm_config.tcp_port, &addrlen); + make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen); outmessage.msg_name = &rem_addr; outmessage.msg_namelen = addrlen; @@ -1088,101 +1096,75 @@ int dlm_lowcomms_close(int nodeid) return 0; } -static int write_list_empty(void) +// PJC: The work queue function for receiving. +static void process_recv_sockets(struct work_struct *work) { - int status; - - spin_lock_bh(&write_nodes_lock); - status = list_empty(&write_nodes); - spin_unlock_bh(&write_nodes_lock); - - return status; -} - -static int dlm_recvd(void *data) -{ - DECLARE_WAITQUEUE(wait, current); - - while (!kthread_should_stop()) { + if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) { + int ret; int count = 0; - set_current_state(TASK_INTERRUPTIBLE); - add_wait_queue(&lowcomms_recv_wait, &wait); - if (!test_bit(CF_READ_PENDING, &sctp_con.flags)) - cond_resched(); - remove_wait_queue(&lowcomms_recv_wait, &wait); - set_current_state(TASK_RUNNING); - - if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) { - int ret; - - do { - ret = receive_from_sock(); + do { + ret = receive_from_sock(); - /* Don't starve out everyone else */ - if (++count >= MAX_RX_MSG_COUNT) { - cond_resched(); - count = 0; - } - } while (!kthread_should_stop() && ret >=0); - } - cond_resched(); + /* Don't starve out everyone else */ + if (++count >= MAX_RX_MSG_COUNT) { + cond_resched(); + count = 0; + } + } while (!kthread_should_stop() && ret >=0); } - - return 0; + cond_resched(); } -static int dlm_sendd(void *data) +// PJC: the work queue function for sending +static void process_send_sockets(struct work_struct *work) { - DECLARE_WAITQUEUE(wait, current); - - add_wait_queue(sctp_con.sock->sk->sk_sleep, &wait); - - while (!kthread_should_stop()) { - set_current_state(TASK_INTERRUPTIBLE); - if (write_list_empty()) - cond_resched(); - set_current_state(TASK_RUNNING); - - if (sctp_con.eagain_flag) { - sctp_con.eagain_flag = 0; - refill_write_queue(); - } - process_output_queue(); + if (sctp_con.eagain_flag) { + sctp_con.eagain_flag = 0; + refill_write_queue(); } + process_output_queue(); +} - remove_wait_queue(sctp_con.sock->sk->sk_sleep, &wait); - - return 0; +// PJC: Process lock requests from a particular node. +// TODO: can we optimise this out on UP ?? +static void process_lock_request(struct work_struct *work) +{ } static void daemons_stop(void) { - kthread_stop(recv_task); - kthread_stop(send_task); + destroy_workqueue(recv_workqueue); + destroy_workqueue(send_workqueue); + destroy_workqueue(lock_workqueue); } static int daemons_start(void) { - struct task_struct *p; int error; + recv_workqueue = create_workqueue("dlm_recv"); + error = IS_ERR(recv_workqueue); + if (error) { + log_print("can't start dlm_recv %d", error); + return error; + } - p = kthread_run(dlm_recvd, NULL, "dlm_recvd"); - error = IS_ERR(p); + send_workqueue = create_singlethread_workqueue("dlm_send"); + error = IS_ERR(send_workqueue); if (error) { - log_print("can't start dlm_recvd %d", error); + log_print("can't start dlm_send %d", error); + destroy_workqueue(recv_workqueue); return error; } - recv_task = p; - p = kthread_run(dlm_sendd, NULL, "dlm_sendd"); - error = IS_ERR(p); + lock_workqueue = create_workqueue("dlm_rlock"); + error = IS_ERR(lock_workqueue); if (error) { - log_print("can't start dlm_sendd %d", error); - kthread_stop(recv_task); + log_print("can't start dlm_rlock %d", error); + destroy_workqueue(send_workqueue); + destroy_workqueue(recv_workqueue); return error; } - send_task = p; return 0; } @@ -1194,6 +1176,8 @@ int dlm_lowcomms_start(void) { int error; + INIT_WORK(&sctp_con.work, process_recv_sockets); + error = init_sock(); if (error) goto fail_sock; @@ -1224,4 +1208,3 @@ void dlm_lowcomms_stop(void) for (i = 0; i < dlm_local_count; i++) kfree(dlm_local_addr[i]); } - |