]> err.no Git - linux-2.6/blob - fs/dlm/lowcomms-tcp.c
[DLM] Use workqueues for dlm lowcomms
[linux-2.6] / fs / dlm / lowcomms-tcp.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2006 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 /*
15  * lowcomms.c
16  *
17  * This is the "low-level" comms layer.
18  *
19  * It is responsible for sending/receiving messages
20  * from other nodes in the cluster.
21  *
22  * Cluster nodes are referred to by their nodeids. nodeids are
23  * simply 32 bit numbers to the locking module - if they need to
24  * be expanded for the cluster infrastructure then that is it's
25  * responsibility. It is this layer's
26  * responsibility to resolve these into IP address or
27  * whatever it needs for inter-node communication.
28  *
29  * The comms level is two kernel threads that deal mainly with
30  * the receiving of messages from other nodes and passing them
31  * up to the mid-level comms layer (which understands the
32  * message format) for execution by the locking core, and
33  * a send thread which does all the setting up of connections
34  * to remote nodes and the sending of data. Threads are not allowed
35  * to send their own data because it may cause them to wait in times
36  * of high load. Also, this way, the sending thread can collect together
37  * messages bound for one node and send them in one block.
38  *
39  * I don't see any problem with the recv thread executing the locking
40  * code on behalf of remote processes as the locking code is
41  * short, efficient and never waits.
42  *
43  */
44
45
46 #include <asm/ioctls.h>
47 #include <net/sock.h>
48 #include <net/tcp.h>
49 #include <linux/pagemap.h>
50
51 #include "dlm_internal.h"
52 #include "lowcomms.h"
53 #include "midcomms.h"
54 #include "config.h"
55
56 struct cbuf {
57         unsigned int base;
58         unsigned int len;
59         unsigned int mask;
60 };
61
62 #define NODE_INCREMENT 32
63 static void cbuf_add(struct cbuf *cb, int n)
64 {
65         cb->len += n;
66 }
67
68 static int cbuf_data(struct cbuf *cb)
69 {
70         return ((cb->base + cb->len) & cb->mask);
71 }
72
73 static void cbuf_init(struct cbuf *cb, int size)
74 {
75         cb->base = cb->len = 0;
76         cb->mask = size-1;
77 }
78
79 static void cbuf_eat(struct cbuf *cb, int n)
80 {
81         cb->len  -= n;
82         cb->base += n;
83         cb->base &= cb->mask;
84 }
85
86 static bool cbuf_empty(struct cbuf *cb)
87 {
88         return cb->len == 0;
89 }
90
91 /* Maximum number of incoming messages to process before
92    doing a cond_resched()
93 */
94 #define MAX_RX_MSG_COUNT 25
95
96 struct connection {
97         struct socket *sock;    /* NULL if not connected */
98         uint32_t nodeid;        /* So we know who we are in the list */
99         struct rw_semaphore sock_sem; /* Stop connect races */
100         struct list_head read_list;   /* On this list when ready for reading */
101         struct list_head write_list;  /* On this list when ready for writing */
102         struct list_head state_list;  /* On this list when ready to connect */
103         unsigned long flags;    /* bit 1,2 = We are on the read/write lists */
104 #define CF_READ_PENDING 1
105 #define CF_WRITE_PENDING 2
106 #define CF_CONNECT_PENDING 3
107 #define CF_IS_OTHERCON 4
108         struct list_head writequeue;  /* List of outgoing writequeue_entries */
109         struct list_head listenlist;  /* List of allocated listening sockets */
110         spinlock_t writequeue_lock;
111         int (*rx_action) (struct connection *); /* What to do when active */
112         struct page *rx_page;
113         struct cbuf cb;
114         int retries;
115         atomic_t waiting_requests;
116 #define MAX_CONNECT_RETRIES 3
117         struct connection *othercon;
118         struct work_struct rwork; /* Receive workqueue */
119         struct work_struct swork; /* Send workqueue */
120 };
121 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
122
123 /* An entry waiting to be sent */
124 struct writequeue_entry {
125         struct list_head list;
126         struct page *page;
127         int offset;
128         int len;
129         int end;
130         int users;
131         struct connection *con;
132 };
133
134 static struct sockaddr_storage dlm_local_addr;
135
136 /* Work queues */
137 static struct workqueue_struct *recv_workqueue;
138 static struct workqueue_struct *send_workqueue;
139
140 /* An array of pointers to connections, indexed by NODEID */
141 static struct connection **connections;
142 static DECLARE_MUTEX(connections_lock);
143 static struct kmem_cache *con_cache;
144 static int conn_array_size;
145
146 static void process_recv_sockets(struct work_struct *work);
147 static void process_send_sockets(struct work_struct *work);
148
149 static struct connection *nodeid2con(int nodeid, gfp_t allocation)
150 {
151         struct connection *con = NULL;
152
153         down(&connections_lock);
154         if (nodeid >= conn_array_size) {
155                 int new_size = nodeid + NODE_INCREMENT;
156                 struct connection **new_conns;
157
158                 new_conns = kzalloc(sizeof(struct connection *) *
159                                     new_size, allocation);
160                 if (!new_conns)
161                         goto finish;
162
163                 memcpy(new_conns, connections,  sizeof(struct connection *) * conn_array_size);
164                 conn_array_size = new_size;
165                 kfree(connections);
166                 connections = new_conns;
167
168         }
169
170         con = connections[nodeid];
171         if (con == NULL && allocation) {
172                 con = kmem_cache_zalloc(con_cache, allocation);
173                 if (!con)
174                         goto finish;
175
176                 con->nodeid = nodeid;
177                 init_rwsem(&con->sock_sem);
178                 INIT_LIST_HEAD(&con->writequeue);
179                 spin_lock_init(&con->writequeue_lock);
180                 INIT_WORK(&con->swork, process_send_sockets);
181                 INIT_WORK(&con->rwork, process_recv_sockets);
182
183                 connections[nodeid] = con;
184         }
185
186 finish:
187         up(&connections_lock);
188         return con;
189 }
190
191 /* Data available on socket or listen socket received a connect */
192 static void lowcomms_data_ready(struct sock *sk, int count_unused)
193 {
194         struct connection *con = sock2con(sk);
195
196         if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
197                 queue_work(recv_workqueue, &con->rwork);
198 }
199
200 static void lowcomms_write_space(struct sock *sk)
201 {
202         struct connection *con = sock2con(sk);
203
204         if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
205                 queue_work(send_workqueue, &con->swork);
206 }
207
208 static inline void lowcomms_connect_sock(struct connection *con)
209 {
210         if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
211                 queue_work(send_workqueue, &con->swork);
212 }
213
214 static void lowcomms_state_change(struct sock *sk)
215 {
216         if (sk->sk_state == TCP_ESTABLISHED)
217                 lowcomms_write_space(sk);
218 }
219
220 /* Make a socket active */
221 static int add_sock(struct socket *sock, struct connection *con)
222 {
223         con->sock = sock;
224
225         /* Install a data_ready callback */
226         con->sock->sk->sk_data_ready = lowcomms_data_ready;
227         con->sock->sk->sk_write_space = lowcomms_write_space;
228         con->sock->sk->sk_state_change = lowcomms_state_change;
229
230         return 0;
231 }
232
233 /* Add the port number to an IP6 or 4 sockaddr and return the address
234    length */
235 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
236                           int *addr_len)
237 {
238         saddr->ss_family =  dlm_local_addr.ss_family;
239         if (saddr->ss_family == AF_INET) {
240                 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
241                 in4_addr->sin_port = cpu_to_be16(port);
242                 *addr_len = sizeof(struct sockaddr_in);
243         } else {
244                 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
245                 in6_addr->sin6_port = cpu_to_be16(port);
246                 *addr_len = sizeof(struct sockaddr_in6);
247         }
248 }
249
250 /* Close a remote connection and tidy up */
251 static void close_connection(struct connection *con, bool and_other)
252 {
253         down_write(&con->sock_sem);
254
255         if (con->sock) {
256                 sock_release(con->sock);
257                 con->sock = NULL;
258         }
259         if (con->othercon && and_other) {
260                 /* Will only re-enter once. */
261                 close_connection(con->othercon, false);
262         }
263         if (con->rx_page) {
264                 __free_page(con->rx_page);
265                 con->rx_page = NULL;
266         }
267         con->retries = 0;
268         up_write(&con->sock_sem);
269 }
270
271 /* Data received from remote end */
272 static int receive_from_sock(struct connection *con)
273 {
274         int ret = 0;
275         struct msghdr msg;
276         struct iovec iov[2];
277         mm_segment_t fs;
278         unsigned len;
279         int r;
280         int call_again_soon = 0;
281
282         down_read(&con->sock_sem);
283
284         if (con->sock == NULL)
285                 goto out;
286         if (con->rx_page == NULL) {
287                 /*
288                  * This doesn't need to be atomic, but I think it should
289                  * improve performance if it is.
290                  */
291                 con->rx_page = alloc_page(GFP_ATOMIC);
292                 if (con->rx_page == NULL)
293                         goto out_resched;
294                 cbuf_init(&con->cb, PAGE_CACHE_SIZE);
295         }
296
297         msg.msg_control = NULL;
298         msg.msg_controllen = 0;
299         msg.msg_iovlen = 1;
300         msg.msg_iov = iov;
301         msg.msg_name = NULL;
302         msg.msg_namelen = 0;
303         msg.msg_flags = 0;
304
305         /*
306          * iov[0] is the bit of the circular buffer between the current end
307          * point (cb.base + cb.len) and the end of the buffer.
308          */
309         iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
310         iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
311         iov[1].iov_len = 0;
312
313         /*
314          * iov[1] is the bit of the circular buffer between the start of the
315          * buffer and the start of the currently used section (cb.base)
316          */
317         if (cbuf_data(&con->cb) >= con->cb.base) {
318                 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb);
319                 iov[1].iov_len = con->cb.base;
320                 iov[1].iov_base = page_address(con->rx_page);
321                 msg.msg_iovlen = 2;
322         }
323         len = iov[0].iov_len + iov[1].iov_len;
324
325         fs = get_fs();
326         set_fs(get_ds());
327         r = ret = sock_recvmsg(con->sock, &msg, len,
328                                MSG_DONTWAIT | MSG_NOSIGNAL);
329         set_fs(fs);
330
331         if (ret <= 0)
332                 goto out_close;
333         if (ret == len)
334                 call_again_soon = 1;
335         cbuf_add(&con->cb, ret);
336         ret = dlm_process_incoming_buffer(con->nodeid,
337                                           page_address(con->rx_page),
338                                           con->cb.base, con->cb.len,
339                                           PAGE_CACHE_SIZE);
340         if (ret == -EBADMSG) {
341                 printk(KERN_INFO "dlm: lowcomms: addr=%p, base=%u, len=%u, "
342                        "iov_len=%u, iov_base[0]=%p, read=%d\n",
343                        page_address(con->rx_page), con->cb.base, con->cb.len,
344                        len, iov[0].iov_base, r);
345         }
346         if (ret < 0)
347                 goto out_close;
348         cbuf_eat(&con->cb, ret);
349
350         if (cbuf_empty(&con->cb) && !call_again_soon) {
351                 __free_page(con->rx_page);
352                 con->rx_page = NULL;
353         }
354
355 out:
356         if (call_again_soon)
357                 goto out_resched;
358         up_read(&con->sock_sem);
359         return 0;
360
361 out_resched:
362         if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
363                 queue_work(recv_workqueue, &con->rwork);
364         up_read(&con->sock_sem);
365         cond_resched();
366         return 0;
367
368 out_close:
369         up_read(&con->sock_sem);
370         if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) {
371                 close_connection(con, false);
372                 /* Reconnect when there is something to send */
373         }
374
375         return ret;
376 }
377
378 /* Listening socket is busy, accept a connection */
379 static int accept_from_sock(struct connection *con)
380 {
381         int result;
382         struct sockaddr_storage peeraddr;
383         struct socket *newsock;
384         int len;
385         int nodeid;
386         struct connection *newcon;
387
388         memset(&peeraddr, 0, sizeof(peeraddr));
389         result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
390                                   IPPROTO_TCP, &newsock);
391         if (result < 0)
392                 return -ENOMEM;
393
394         down_read(&con->sock_sem);
395
396         result = -ENOTCONN;
397         if (con->sock == NULL)
398                 goto accept_err;
399
400         newsock->type = con->sock->type;
401         newsock->ops = con->sock->ops;
402
403         result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK);
404         if (result < 0)
405                 goto accept_err;
406
407         /* Get the connected socket's peer */
408         memset(&peeraddr, 0, sizeof(peeraddr));
409         if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
410                                   &len, 2)) {
411                 result = -ECONNABORTED;
412                 goto accept_err;
413         }
414
415         /* Get the new node's NODEID */
416         make_sockaddr(&peeraddr, 0, &len);
417         if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
418                 printk("dlm: connect from non cluster node\n");
419                 sock_release(newsock);
420                 up_read(&con->sock_sem);
421                 return -1;
422         }
423
424         log_print("got connection from %d", nodeid);
425
426         /*  Check to see if we already have a connection to this node. This
427          *  could happen if the two nodes initiate a connection at roughly
428          *  the same time and the connections cross on the wire.
429          * TEMPORARY FIX:
430          *  In this case we store the incoming one in "othercon"
431          */
432         newcon = nodeid2con(nodeid, GFP_KERNEL);
433         if (!newcon) {
434                 result = -ENOMEM;
435                 goto accept_err;
436         }
437         down_write(&newcon->sock_sem);
438         if (newcon->sock) {
439                 struct connection *othercon = newcon->othercon;
440
441                 if (!othercon) {
442                         othercon = kmem_cache_zalloc(con_cache, GFP_KERNEL);
443                         if (!othercon) {
444                                 printk("dlm: failed to allocate incoming socket\n");
445                                 up_write(&newcon->sock_sem);
446                                 result = -ENOMEM;
447                                 goto accept_err;
448                         }
449                         othercon->nodeid = nodeid;
450                         othercon->rx_action = receive_from_sock;
451                         init_rwsem(&othercon->sock_sem);
452                         INIT_WORK(&othercon->swork, process_send_sockets);
453                         INIT_WORK(&othercon->rwork, process_recv_sockets);
454                         set_bit(CF_IS_OTHERCON, &othercon->flags);
455                         newcon->othercon = othercon;
456                 }
457                 othercon->sock = newsock;
458                 newsock->sk->sk_user_data = othercon;
459                 add_sock(newsock, othercon);
460         }
461         else {
462                 newsock->sk->sk_user_data = newcon;
463                 newcon->rx_action = receive_from_sock;
464                 add_sock(newsock, newcon);
465
466         }
467
468         up_write(&newcon->sock_sem);
469
470         /*
471          * Add it to the active queue in case we got data
472          * beween processing the accept adding the socket
473          * to the read_sockets list
474          */
475         if (!test_and_set_bit(CF_READ_PENDING, &newcon->flags))
476                 queue_work(recv_workqueue, &newcon->rwork);
477         up_read(&con->sock_sem);
478
479         return 0;
480
481 accept_err:
482         up_read(&con->sock_sem);
483         sock_release(newsock);
484
485         if (result != -EAGAIN)
486                 printk("dlm: error accepting connection from node: %d\n", result);
487         return result;
488 }
489
490 /* Connect a new socket to its peer */
491 static void connect_to_sock(struct connection *con)
492 {
493         int result = -EHOSTUNREACH;
494         struct sockaddr_storage saddr;
495         int addr_len;
496         struct socket *sock;
497
498         if (con->nodeid == 0) {
499                 log_print("attempt to connect sock 0 foiled");
500                 return;
501         }
502
503         down_write(&con->sock_sem);
504         if (con->retries++ > MAX_CONNECT_RETRIES)
505                 goto out;
506
507         /* Some odd races can cause double-connects, ignore them */
508         if (con->sock) {
509                 result = 0;
510                 goto out;
511         }
512
513         /* Create a socket to communicate with */
514         result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
515                                   IPPROTO_TCP, &sock);
516         if (result < 0)
517                 goto out_err;
518
519         memset(&saddr, 0, sizeof(saddr));
520         if (dlm_nodeid_to_addr(con->nodeid, &saddr))
521                 goto out_err;
522
523         sock->sk->sk_user_data = con;
524         con->rx_action = receive_from_sock;
525
526         make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
527
528         add_sock(sock, con);
529
530         log_print("connecting to %d", con->nodeid);
531         result =
532                 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
533                                    O_NONBLOCK);
534         if (result == -EINPROGRESS)
535                 result = 0;
536         if (result == 0)
537                 goto out;
538
539 out_err:
540         if (con->sock) {
541                 sock_release(con->sock);
542                 con->sock = NULL;
543         }
544         /*
545          * Some errors are fatal and this list might need adjusting. For other
546          * errors we try again until the max number of retries is reached.
547          */
548         if (result != -EHOSTUNREACH && result != -ENETUNREACH &&
549             result != -ENETDOWN && result != EINVAL
550             && result != -EPROTONOSUPPORT) {
551                 lowcomms_connect_sock(con);
552                 result = 0;
553         }
554 out:
555         up_write(&con->sock_sem);
556         return;
557 }
558
559 static struct socket *create_listen_sock(struct connection *con,
560                                          struct sockaddr_storage *saddr)
561 {
562         struct socket *sock = NULL;
563         mm_segment_t fs;
564         int result = 0;
565         int one = 1;
566         int addr_len;
567
568         if (dlm_local_addr.ss_family == AF_INET)
569                 addr_len = sizeof(struct sockaddr_in);
570         else
571                 addr_len = sizeof(struct sockaddr_in6);
572
573         /* Create a socket to communicate with */
574         result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock);
575         if (result < 0) {
576                 printk("dlm: Can't create listening comms socket\n");
577                 goto create_out;
578         }
579
580         fs = get_fs();
581         set_fs(get_ds());
582         result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
583                                  (char *)&one, sizeof(one));
584         set_fs(fs);
585         if (result < 0) {
586                 printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n",
587                        result);
588         }
589         sock->sk->sk_user_data = con;
590         con->rx_action = accept_from_sock;
591         con->sock = sock;
592
593         /* Bind to our port */
594         make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
595         result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
596         if (result < 0) {
597                 printk("dlm: Can't bind to port %d\n", dlm_config.ci_tcp_port);
598                 sock_release(sock);
599                 sock = NULL;
600                 con->sock = NULL;
601                 goto create_out;
602         }
603
604         fs = get_fs();
605         set_fs(get_ds());
606
607         result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
608                                  (char *)&one, sizeof(one));
609         set_fs(fs);
610         if (result < 0) {
611                 printk("dlm: Set keepalive failed: %d\n", result);
612         }
613
614         result = sock->ops->listen(sock, 5);
615         if (result < 0) {
616                 printk("dlm: Can't listen on port %d\n",
617                        dlm_config.ci_tcp_port);
618                 sock_release(sock);
619                 sock = NULL;
620                 goto create_out;
621         }
622
623 create_out:
624         return sock;
625 }
626
627
628 /* Listen on all interfaces */
629 static int listen_for_all(void)
630 {
631         struct socket *sock = NULL;
632         struct connection *con = nodeid2con(0, GFP_KERNEL);
633         int result = -EINVAL;
634
635         /* We don't support multi-homed hosts */
636         set_bit(CF_IS_OTHERCON, &con->flags);
637
638         sock = create_listen_sock(con, &dlm_local_addr);
639         if (sock) {
640                 add_sock(sock, con);
641                 result = 0;
642         }
643         else {
644                 result = -EADDRINUSE;
645         }
646
647         return result;
648 }
649
650
651
652 static struct writequeue_entry *new_writequeue_entry(struct connection *con,
653                                                      gfp_t allocation)
654 {
655         struct writequeue_entry *entry;
656
657         entry = kmalloc(sizeof(struct writequeue_entry), allocation);
658         if (!entry)
659                 return NULL;
660
661         entry->page = alloc_page(allocation);
662         if (!entry->page) {
663                 kfree(entry);
664                 return NULL;
665         }
666
667         entry->offset = 0;
668         entry->len = 0;
669         entry->end = 0;
670         entry->users = 0;
671         entry->con = con;
672
673         return entry;
674 }
675
676 void *dlm_lowcomms_get_buffer(int nodeid, int len,
677                               gfp_t allocation, char **ppc)
678 {
679         struct connection *con;
680         struct writequeue_entry *e;
681         int offset = 0;
682         int users = 0;
683
684         con = nodeid2con(nodeid, allocation);
685         if (!con)
686                 return NULL;
687
688         spin_lock(&con->writequeue_lock);
689         e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
690         if ((&e->list == &con->writequeue) ||
691             (PAGE_CACHE_SIZE - e->end < len)) {
692                 e = NULL;
693         } else {
694                 offset = e->end;
695                 e->end += len;
696                 users = e->users++;
697         }
698         spin_unlock(&con->writequeue_lock);
699
700         if (e) {
701         got_one:
702                 if (users == 0)
703                         kmap(e->page);
704                 *ppc = page_address(e->page) + offset;
705                 return e;
706         }
707
708         e = new_writequeue_entry(con, allocation);
709         if (e) {
710                 spin_lock(&con->writequeue_lock);
711                 offset = e->end;
712                 e->end += len;
713                 users = e->users++;
714                 list_add_tail(&e->list, &con->writequeue);
715                 spin_unlock(&con->writequeue_lock);
716                 goto got_one;
717         }
718         return NULL;
719 }
720
721 void dlm_lowcomms_commit_buffer(void *mh)
722 {
723         struct writequeue_entry *e = (struct writequeue_entry *)mh;
724         struct connection *con = e->con;
725         int users;
726
727         spin_lock(&con->writequeue_lock);
728         users = --e->users;
729         if (users)
730                 goto out;
731         e->len = e->end - e->offset;
732         kunmap(e->page);
733         spin_unlock(&con->writequeue_lock);
734
735         if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
736                 queue_work(send_workqueue, &con->swork);
737         }
738         return;
739
740 out:
741         spin_unlock(&con->writequeue_lock);
742         return;
743 }
744
745 static void free_entry(struct writequeue_entry *e)
746 {
747         __free_page(e->page);
748         kfree(e);
749 }
750
751 /* Send a message */
752 static void send_to_sock(struct connection *con)
753 {
754         int ret = 0;
755         ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int);
756         const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
757         struct writequeue_entry *e;
758         int len, offset;
759
760         down_read(&con->sock_sem);
761         if (con->sock == NULL)
762                 goto out_connect;
763
764         sendpage = con->sock->ops->sendpage;
765
766         spin_lock(&con->writequeue_lock);
767         for (;;) {
768                 e = list_entry(con->writequeue.next, struct writequeue_entry,
769                                list);
770                 if ((struct list_head *) e == &con->writequeue)
771                         break;
772
773                 len = e->len;
774                 offset = e->offset;
775                 BUG_ON(len == 0 && e->users == 0);
776                 spin_unlock(&con->writequeue_lock);
777                 kmap(e->page);
778
779                 ret = 0;
780                 if (len) {
781                         ret = sendpage(con->sock, e->page, offset, len,
782                                        msg_flags);
783                         if (ret == -EAGAIN || ret == 0)
784                                 goto out;
785                         if (ret <= 0)
786                                 goto send_error;
787                 }
788                 else {
789                         /* Don't starve people filling buffers */
790                         cond_resched();
791                 }
792
793                 spin_lock(&con->writequeue_lock);
794                 e->offset += ret;
795                 e->len -= ret;
796
797                 if (e->len == 0 && e->users == 0) {
798                         list_del(&e->list);
799                         kunmap(e->page);
800                         free_entry(e);
801                         continue;
802                 }
803         }
804         spin_unlock(&con->writequeue_lock);
805 out:
806         up_read(&con->sock_sem);
807         return;
808
809 send_error:
810         up_read(&con->sock_sem);
811         close_connection(con, false);
812         lowcomms_connect_sock(con);
813         return;
814
815 out_connect:
816         up_read(&con->sock_sem);
817         lowcomms_connect_sock(con);
818         return;
819 }
820
821 static void clean_one_writequeue(struct connection *con)
822 {
823         struct list_head *list;
824         struct list_head *temp;
825
826         spin_lock(&con->writequeue_lock);
827         list_for_each_safe(list, temp, &con->writequeue) {
828                 struct writequeue_entry *e =
829                         list_entry(list, struct writequeue_entry, list);
830                 list_del(&e->list);
831                 free_entry(e);
832         }
833         spin_unlock(&con->writequeue_lock);
834 }
835
836 /* Called from recovery when it knows that a node has
837    left the cluster */
838 int dlm_lowcomms_close(int nodeid)
839 {
840         struct connection *con;
841
842         if (!connections)
843                 goto out;
844
845         log_print("closing connection to node %d", nodeid);
846         con = nodeid2con(nodeid, 0);
847         if (con) {
848                 clean_one_writequeue(con);
849                 close_connection(con, true);
850                 atomic_set(&con->waiting_requests, 0);
851         }
852         return 0;
853
854 out:
855         return -1;
856 }
857
858 /* Look for activity on active sockets */
859 static void process_recv_sockets(struct work_struct *work)
860 {
861         struct connection *con = container_of(work, struct connection, rwork);
862         int err;
863
864         clear_bit(CF_READ_PENDING, &con->flags);
865         do {
866                 err = con->rx_action(con);
867         } while (!err);
868 }
869
870
871 static void process_send_sockets(struct work_struct *work)
872 {
873         struct connection *con = container_of(work, struct connection, swork);
874
875         if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
876                 connect_to_sock(con);
877         }
878
879         if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags)) {
880                 send_to_sock(con);
881         }
882 }
883
884
885 /* Discard all entries on the write queues */
886 static void clean_writequeues(void)
887 {
888         int nodeid;
889
890         for (nodeid = 1; nodeid < conn_array_size; nodeid++) {
891                 struct connection *con = nodeid2con(nodeid, 0);
892
893                 if (con)
894                         clean_one_writequeue(con);
895         }
896 }
897
898 static void work_stop(void)
899 {
900         destroy_workqueue(recv_workqueue);
901         destroy_workqueue(send_workqueue);
902 }
903
904 static int work_start(void)
905 {
906         int error;
907         recv_workqueue = create_workqueue("dlm_recv");
908         error = IS_ERR(recv_workqueue);
909         if (error) {
910                 log_print("can't start dlm_recv %d", error);
911                 return error;
912         }
913
914         send_workqueue = create_singlethread_workqueue("dlm_send");
915         error = IS_ERR(send_workqueue);
916         if (error) {
917                 log_print("can't start dlm_send %d", error);
918                 destroy_workqueue(recv_workqueue);
919                 return error;
920         }
921
922         return 0;
923 }
924
925 void dlm_lowcomms_stop(void)
926 {
927         int i;
928
929         /* Set all the flags to prevent any
930            socket activity.
931         */
932         for (i = 0; i < conn_array_size; i++) {
933                 if (connections[i])
934                         connections[i]->flags |= 0xFF;
935         }
936
937         work_stop();
938         clean_writequeues();
939
940         for (i = 0; i < conn_array_size; i++) {
941                 if (connections[i]) {
942                         close_connection(connections[i], true);
943                         if (connections[i]->othercon)
944                                 kmem_cache_free(con_cache, connections[i]->othercon);
945                         kmem_cache_free(con_cache, connections[i]);
946                 }
947         }
948
949         kfree(connections);
950         connections = NULL;
951
952         kmem_cache_destroy(con_cache);
953 }
954
955 /* This is quite likely to sleep... */
956 int dlm_lowcomms_start(void)
957 {
958         int error = 0;
959
960         error = -ENOMEM;
961         connections = kzalloc(sizeof(struct connection *) *
962                               NODE_INCREMENT, GFP_KERNEL);
963         if (!connections)
964                 goto out;
965
966         conn_array_size = NODE_INCREMENT;
967
968         if (dlm_our_addr(&dlm_local_addr, 0)) {
969                 log_print("no local IP address has been set");
970                 goto fail_free_conn;
971         }
972         if (!dlm_our_addr(&dlm_local_addr, 1)) {
973                 log_print("This dlm comms module does not support multi-homed clustering");
974                 goto fail_free_conn;
975         }
976
977         con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
978                                       __alignof__(struct connection), 0,
979                                       NULL, NULL);
980         if (!con_cache)
981                 goto fail_free_conn;
982
983
984         /* Start listening */
985         error = listen_for_all();
986         if (error)
987                 goto fail_unlisten;
988
989         error = work_start();
990         if (error)
991                 goto fail_unlisten;
992
993         return 0;
994
995 fail_unlisten:
996         close_connection(connections[0], false);
997         kmem_cache_free(con_cache, connections[0]);
998         kmem_cache_destroy(con_cache);
999
1000 fail_free_conn:
1001         kfree(connections);
1002
1003 out:
1004         return error;
1005 }
1006
1007 /*
1008  * Overrides for Emacs so that we follow Linus's tabbing style.
1009  * Emacs will notice this stuff at the end of the file and automatically
1010  * adjust the settings for this buffer only.  This must remain at the end
1011  * of the file.
1012  * ---------------------------------------------------------------------------
1013  * Local variables:
1014  * c-file-style: "linux"
1015  * End:
1016  */