]> err.no Git - linux-2.6/blob - net/sunrpc/svcsock.c
[TCP]: Move the tcp sock states to net/tcp_states.h
[linux-2.6] / net / sunrpc / svcsock.c
1 /*
2  * linux/net/sunrpc/svcsock.c
3  *
4  * These are the RPC server socket internals.
5  *
6  * The server scheduling algorithm does not always distribute the load
7  * evenly when servicing a single client. May need to modify the
8  * svc_sock_enqueue procedure...
9  *
10  * TCP support is largely untested and may be a little slow. The problem
11  * is that we currently do two separate recvfrom's, one for the 4-byte
12  * record length, and the second for the actual record. This could possibly
13  * be improved by always reading a minimum size of around 100 bytes and
14  * tucking any superfluous bytes away in a temporary store. Still, that
15  * leaves write requests out in the rain. An alternative may be to peek at
16  * the first skb in the queue, and if it matches the next TCP sequence
17  * number, to extract the record marker. Yuck.
18  *
19  * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de>
20  */
21
22 #include <linux/sched.h>
23 #include <linux/errno.h>
24 #include <linux/fcntl.h>
25 #include <linux/net.h>
26 #include <linux/in.h>
27 #include <linux/inet.h>
28 #include <linux/udp.h>
29 #include <linux/unistd.h>
30 #include <linux/slab.h>
31 #include <linux/netdevice.h>
32 #include <linux/skbuff.h>
33 #include <net/sock.h>
34 #include <net/checksum.h>
35 #include <net/ip.h>
36 #include <net/tcp_states.h>
37 #include <asm/uaccess.h>
38 #include <asm/ioctls.h>
39
40 #include <linux/sunrpc/types.h>
41 #include <linux/sunrpc/xdr.h>
42 #include <linux/sunrpc/svcsock.h>
43 #include <linux/sunrpc/stats.h>
44
45 /* SMP locking strategy:
46  *
47  *      svc_serv->sv_lock protects most stuff for that service.
48  *
49  *      Some flags can be set to certain values at any time
50  *      providing that certain rules are followed:
51  *
52  *      SK_BUSY  can be set to 0 at any time.  
53  *              svc_sock_enqueue must be called afterwards
54  *      SK_CONN, SK_DATA, can be set or cleared at any time.
55  *              after a set, svc_sock_enqueue must be called.   
56  *              after a clear, the socket must be read/accepted
57  *               if this succeeds, it must be set again.
58  *      SK_CLOSE can set at any time. It is never cleared.
59  *
60  */
61
62 #define RPCDBG_FACILITY RPCDBG_SVCSOCK
63
64
65 static struct svc_sock *svc_setup_socket(struct svc_serv *, struct socket *,
66                                          int *errp, int pmap_reg);
67 static void             svc_udp_data_ready(struct sock *, int);
68 static int              svc_udp_recvfrom(struct svc_rqst *);
69 static int              svc_udp_sendto(struct svc_rqst *);
70
71 static struct svc_deferred_req *svc_deferred_dequeue(struct svc_sock *svsk);
72 static int svc_deferred_recv(struct svc_rqst *rqstp);
73 static struct cache_deferred_req *svc_defer(struct cache_req *req);
74
75 /*
76  * Queue up an idle server thread.  Must have serv->sv_lock held.
77  * Note: this is really a stack rather than a queue, so that we only
78  * use as many different threads as we need, and the rest don't polute
79  * the cache.
80  */
81 static inline void
82 svc_serv_enqueue(struct svc_serv *serv, struct svc_rqst *rqstp)
83 {
84         list_add(&rqstp->rq_list, &serv->sv_threads);
85 }
86
87 /*
88  * Dequeue an nfsd thread.  Must have serv->sv_lock held.
89  */
90 static inline void
91 svc_serv_dequeue(struct svc_serv *serv, struct svc_rqst *rqstp)
92 {
93         list_del(&rqstp->rq_list);
94 }
95
96 /*
97  * Release an skbuff after use
98  */
99 static inline void
100 svc_release_skb(struct svc_rqst *rqstp)
101 {
102         struct sk_buff *skb = rqstp->rq_skbuff;
103         struct svc_deferred_req *dr = rqstp->rq_deferred;
104
105         if (skb) {
106                 rqstp->rq_skbuff = NULL;
107
108                 dprintk("svc: service %p, releasing skb %p\n", rqstp, skb);
109                 skb_free_datagram(rqstp->rq_sock->sk_sk, skb);
110         }
111         if (dr) {
112                 rqstp->rq_deferred = NULL;
113                 kfree(dr);
114         }
115 }
116
117 /*
118  * Any space to write?
119  */
120 static inline unsigned long
121 svc_sock_wspace(struct svc_sock *svsk)
122 {
123         int wspace;
124
125         if (svsk->sk_sock->type == SOCK_STREAM)
126                 wspace = sk_stream_wspace(svsk->sk_sk);
127         else
128                 wspace = sock_wspace(svsk->sk_sk);
129
130         return wspace;
131 }
132
133 /*
134  * Queue up a socket with data pending. If there are idle nfsd
135  * processes, wake 'em up.
136  *
137  */
138 static void
139 svc_sock_enqueue(struct svc_sock *svsk)
140 {
141         struct svc_serv *serv = svsk->sk_server;
142         struct svc_rqst *rqstp;
143
144         if (!(svsk->sk_flags &
145               ( (1<<SK_CONN)|(1<<SK_DATA)|(1<<SK_CLOSE)|(1<<SK_DEFERRED)) ))
146                 return;
147         if (test_bit(SK_DEAD, &svsk->sk_flags))
148                 return;
149
150         spin_lock_bh(&serv->sv_lock);
151
152         if (!list_empty(&serv->sv_threads) && 
153             !list_empty(&serv->sv_sockets))
154                 printk(KERN_ERR
155                         "svc_sock_enqueue: threads and sockets both waiting??\n");
156
157         if (test_bit(SK_DEAD, &svsk->sk_flags)) {
158                 /* Don't enqueue dead sockets */
159                 dprintk("svc: socket %p is dead, not enqueued\n", svsk->sk_sk);
160                 goto out_unlock;
161         }
162
163         if (test_bit(SK_BUSY, &svsk->sk_flags)) {
164                 /* Don't enqueue socket while daemon is receiving */
165                 dprintk("svc: socket %p busy, not enqueued\n", svsk->sk_sk);
166                 goto out_unlock;
167         }
168
169         set_bit(SOCK_NOSPACE, &svsk->sk_sock->flags);
170         if (((svsk->sk_reserved + serv->sv_bufsz)*2
171              > svc_sock_wspace(svsk))
172             && !test_bit(SK_CLOSE, &svsk->sk_flags)
173             && !test_bit(SK_CONN, &svsk->sk_flags)) {
174                 /* Don't enqueue while not enough space for reply */
175                 dprintk("svc: socket %p  no space, %d*2 > %ld, not enqueued\n",
176                         svsk->sk_sk, svsk->sk_reserved+serv->sv_bufsz,
177                         svc_sock_wspace(svsk));
178                 goto out_unlock;
179         }
180         clear_bit(SOCK_NOSPACE, &svsk->sk_sock->flags);
181
182         /* Mark socket as busy. It will remain in this state until the
183          * server has processed all pending data and put the socket back
184          * on the idle list.
185          */
186         set_bit(SK_BUSY, &svsk->sk_flags);
187
188         if (!list_empty(&serv->sv_threads)) {
189                 rqstp = list_entry(serv->sv_threads.next,
190                                    struct svc_rqst,
191                                    rq_list);
192                 dprintk("svc: socket %p served by daemon %p\n",
193                         svsk->sk_sk, rqstp);
194                 svc_serv_dequeue(serv, rqstp);
195                 if (rqstp->rq_sock)
196                         printk(KERN_ERR 
197                                 "svc_sock_enqueue: server %p, rq_sock=%p!\n",
198                                 rqstp, rqstp->rq_sock);
199                 rqstp->rq_sock = svsk;
200                 svsk->sk_inuse++;
201                 rqstp->rq_reserved = serv->sv_bufsz;
202                 svsk->sk_reserved += rqstp->rq_reserved;
203                 wake_up(&rqstp->rq_wait);
204         } else {
205                 dprintk("svc: socket %p put into queue\n", svsk->sk_sk);
206                 list_add_tail(&svsk->sk_ready, &serv->sv_sockets);
207         }
208
209 out_unlock:
210         spin_unlock_bh(&serv->sv_lock);
211 }
212
213 /*
214  * Dequeue the first socket.  Must be called with the serv->sv_lock held.
215  */
216 static inline struct svc_sock *
217 svc_sock_dequeue(struct svc_serv *serv)
218 {
219         struct svc_sock *svsk;
220
221         if (list_empty(&serv->sv_sockets))
222                 return NULL;
223
224         svsk = list_entry(serv->sv_sockets.next,
225                           struct svc_sock, sk_ready);
226         list_del_init(&svsk->sk_ready);
227
228         dprintk("svc: socket %p dequeued, inuse=%d\n",
229                 svsk->sk_sk, svsk->sk_inuse);
230
231         return svsk;
232 }
233
234 /*
235  * Having read something from a socket, check whether it
236  * needs to be re-enqueued.
237  * Note: SK_DATA only gets cleared when a read-attempt finds
238  * no (or insufficient) data.
239  */
240 static inline void
241 svc_sock_received(struct svc_sock *svsk)
242 {
243         clear_bit(SK_BUSY, &svsk->sk_flags);
244         svc_sock_enqueue(svsk);
245 }
246
247
248 /**
249  * svc_reserve - change the space reserved for the reply to a request.
250  * @rqstp:  The request in question
251  * @space: new max space to reserve
252  *
253  * Each request reserves some space on the output queue of the socket
254  * to make sure the reply fits.  This function reduces that reserved
255  * space to be the amount of space used already, plus @space.
256  *
257  */
258 void svc_reserve(struct svc_rqst *rqstp, int space)
259 {
260         space += rqstp->rq_res.head[0].iov_len;
261
262         if (space < rqstp->rq_reserved) {
263                 struct svc_sock *svsk = rqstp->rq_sock;
264                 spin_lock_bh(&svsk->sk_server->sv_lock);
265                 svsk->sk_reserved -= (rqstp->rq_reserved - space);
266                 rqstp->rq_reserved = space;
267                 spin_unlock_bh(&svsk->sk_server->sv_lock);
268
269                 svc_sock_enqueue(svsk);
270         }
271 }
272
273 /*
274  * Release a socket after use.
275  */
276 static inline void
277 svc_sock_put(struct svc_sock *svsk)
278 {
279         struct svc_serv *serv = svsk->sk_server;
280
281         spin_lock_bh(&serv->sv_lock);
282         if (!--(svsk->sk_inuse) && test_bit(SK_DEAD, &svsk->sk_flags)) {
283                 spin_unlock_bh(&serv->sv_lock);
284                 dprintk("svc: releasing dead socket\n");
285                 sock_release(svsk->sk_sock);
286                 kfree(svsk);
287         }
288         else
289                 spin_unlock_bh(&serv->sv_lock);
290 }
291
292 static void
293 svc_sock_release(struct svc_rqst *rqstp)
294 {
295         struct svc_sock *svsk = rqstp->rq_sock;
296
297         svc_release_skb(rqstp);
298
299         svc_free_allpages(rqstp);
300         rqstp->rq_res.page_len = 0;
301         rqstp->rq_res.page_base = 0;
302
303
304         /* Reset response buffer and release
305          * the reservation.
306          * But first, check that enough space was reserved
307          * for the reply, otherwise we have a bug!
308          */
309         if ((rqstp->rq_res.len) >  rqstp->rq_reserved)
310                 printk(KERN_ERR "RPC request reserved %d but used %d\n",
311                        rqstp->rq_reserved,
312                        rqstp->rq_res.len);
313
314         rqstp->rq_res.head[0].iov_len = 0;
315         svc_reserve(rqstp, 0);
316         rqstp->rq_sock = NULL;
317
318         svc_sock_put(svsk);
319 }
320
321 /*
322  * External function to wake up a server waiting for data
323  */
324 void
325 svc_wake_up(struct svc_serv *serv)
326 {
327         struct svc_rqst *rqstp;
328
329         spin_lock_bh(&serv->sv_lock);
330         if (!list_empty(&serv->sv_threads)) {
331                 rqstp = list_entry(serv->sv_threads.next,
332                                    struct svc_rqst,
333                                    rq_list);
334                 dprintk("svc: daemon %p woken up.\n", rqstp);
335                 /*
336                 svc_serv_dequeue(serv, rqstp);
337                 rqstp->rq_sock = NULL;
338                  */
339                 wake_up(&rqstp->rq_wait);
340         }
341         spin_unlock_bh(&serv->sv_lock);
342 }
343
344 /*
345  * Generic sendto routine
346  */
347 static int
348 svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr)
349 {
350         struct svc_sock *svsk = rqstp->rq_sock;
351         struct socket   *sock = svsk->sk_sock;
352         int             slen;
353         char            buffer[CMSG_SPACE(sizeof(struct in_pktinfo))];
354         struct cmsghdr *cmh = (struct cmsghdr *)buffer;
355         struct in_pktinfo *pki = (struct in_pktinfo *)CMSG_DATA(cmh);
356         int             len = 0;
357         int             result;
358         int             size;
359         struct page     **ppage = xdr->pages;
360         size_t          base = xdr->page_base;
361         unsigned int    pglen = xdr->page_len;
362         unsigned int    flags = MSG_MORE;
363
364         slen = xdr->len;
365
366         if (rqstp->rq_prot == IPPROTO_UDP) {
367                 /* set the source and destination */
368                 struct msghdr   msg;
369                 msg.msg_name    = &rqstp->rq_addr;
370                 msg.msg_namelen = sizeof(rqstp->rq_addr);
371                 msg.msg_iov     = NULL;
372                 msg.msg_iovlen  = 0;
373                 msg.msg_flags   = MSG_MORE;
374
375                 msg.msg_control = cmh;
376                 msg.msg_controllen = sizeof(buffer);
377                 cmh->cmsg_len = CMSG_LEN(sizeof(*pki));
378                 cmh->cmsg_level = SOL_IP;
379                 cmh->cmsg_type = IP_PKTINFO;
380                 pki->ipi_ifindex = 0;
381                 pki->ipi_spec_dst.s_addr = rqstp->rq_daddr;
382
383                 if (sock_sendmsg(sock, &msg, 0) < 0)
384                         goto out;
385         }
386
387         /* send head */
388         if (slen == xdr->head[0].iov_len)
389                 flags = 0;
390         len = sock->ops->sendpage(sock, rqstp->rq_respages[0], 0, xdr->head[0].iov_len, flags);
391         if (len != xdr->head[0].iov_len)
392                 goto out;
393         slen -= xdr->head[0].iov_len;
394         if (slen == 0)
395                 goto out;
396
397         /* send page data */
398         size = PAGE_SIZE - base < pglen ? PAGE_SIZE - base : pglen;
399         while (pglen > 0) {
400                 if (slen == size)
401                         flags = 0;
402                 result = sock->ops->sendpage(sock, *ppage, base, size, flags);
403                 if (result > 0)
404                         len += result;
405                 if (result != size)
406                         goto out;
407                 slen -= size;
408                 pglen -= size;
409                 size = PAGE_SIZE < pglen ? PAGE_SIZE : pglen;
410                 base = 0;
411                 ppage++;
412         }
413         /* send tail */
414         if (xdr->tail[0].iov_len) {
415                 result = sock->ops->sendpage(sock, rqstp->rq_respages[rqstp->rq_restailpage], 
416                                              ((unsigned long)xdr->tail[0].iov_base)& (PAGE_SIZE-1),
417                                              xdr->tail[0].iov_len, 0);
418
419                 if (result > 0)
420                         len += result;
421         }
422 out:
423         dprintk("svc: socket %p sendto([%p %Zu... ], %d) = %d (addr %x)\n",
424                         rqstp->rq_sock, xdr->head[0].iov_base, xdr->head[0].iov_len, xdr->len, len,
425                 rqstp->rq_addr.sin_addr.s_addr);
426
427         return len;
428 }
429
430 /*
431  * Check input queue length
432  */
433 static int
434 svc_recv_available(struct svc_sock *svsk)
435 {
436         mm_segment_t    oldfs;
437         struct socket   *sock = svsk->sk_sock;
438         int             avail, err;
439
440         oldfs = get_fs(); set_fs(KERNEL_DS);
441         err = sock->ops->ioctl(sock, TIOCINQ, (unsigned long) &avail);
442         set_fs(oldfs);
443
444         return (err >= 0)? avail : err;
445 }
446
447 /*
448  * Generic recvfrom routine.
449  */
450 static int
451 svc_recvfrom(struct svc_rqst *rqstp, struct kvec *iov, int nr, int buflen)
452 {
453         struct msghdr   msg;
454         struct socket   *sock;
455         int             len, alen;
456
457         rqstp->rq_addrlen = sizeof(rqstp->rq_addr);
458         sock = rqstp->rq_sock->sk_sock;
459
460         msg.msg_name    = &rqstp->rq_addr;
461         msg.msg_namelen = sizeof(rqstp->rq_addr);
462         msg.msg_control = NULL;
463         msg.msg_controllen = 0;
464
465         msg.msg_flags   = MSG_DONTWAIT;
466
467         len = kernel_recvmsg(sock, &msg, iov, nr, buflen, MSG_DONTWAIT);
468
469         /* sock_recvmsg doesn't fill in the name/namelen, so we must..
470          * possibly we should cache this in the svc_sock structure
471          * at accept time. FIXME
472          */
473         alen = sizeof(rqstp->rq_addr);
474         sock->ops->getname(sock, (struct sockaddr *)&rqstp->rq_addr, &alen, 1);
475
476         dprintk("svc: socket %p recvfrom(%p, %Zu) = %d\n",
477                 rqstp->rq_sock, iov[0].iov_base, iov[0].iov_len, len);
478
479         return len;
480 }
481
482 /*
483  * Set socket snd and rcv buffer lengths
484  */
485 static inline void
486 svc_sock_setbufsize(struct socket *sock, unsigned int snd, unsigned int rcv)
487 {
488 #if 0
489         mm_segment_t    oldfs;
490         oldfs = get_fs(); set_fs(KERNEL_DS);
491         sock_setsockopt(sock, SOL_SOCKET, SO_SNDBUF,
492                         (char*)&snd, sizeof(snd));
493         sock_setsockopt(sock, SOL_SOCKET, SO_RCVBUF,
494                         (char*)&rcv, sizeof(rcv));
495 #else
496         /* sock_setsockopt limits use to sysctl_?mem_max,
497          * which isn't acceptable.  Until that is made conditional
498          * on not having CAP_SYS_RESOURCE or similar, we go direct...
499          * DaveM said I could!
500          */
501         lock_sock(sock->sk);
502         sock->sk->sk_sndbuf = snd * 2;
503         sock->sk->sk_rcvbuf = rcv * 2;
504         sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK|SOCK_RCVBUF_LOCK;
505         release_sock(sock->sk);
506 #endif
507 }
508 /*
509  * INET callback when data has been received on the socket.
510  */
511 static void
512 svc_udp_data_ready(struct sock *sk, int count)
513 {
514         struct svc_sock *svsk = (struct svc_sock *)(sk->sk_user_data);
515
516         if (!svsk)
517                 goto out;
518         dprintk("svc: socket %p(inet %p), count=%d, busy=%d\n",
519                 svsk, sk, count, test_bit(SK_BUSY, &svsk->sk_flags));
520         set_bit(SK_DATA, &svsk->sk_flags);
521         svc_sock_enqueue(svsk);
522  out:
523         if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
524                 wake_up_interruptible(sk->sk_sleep);
525 }
526
527 /*
528  * INET callback when space is newly available on the socket.
529  */
530 static void
531 svc_write_space(struct sock *sk)
532 {
533         struct svc_sock *svsk = (struct svc_sock *)(sk->sk_user_data);
534
535         if (svsk) {
536                 dprintk("svc: socket %p(inet %p), write_space busy=%d\n",
537                         svsk, sk, test_bit(SK_BUSY, &svsk->sk_flags));
538                 svc_sock_enqueue(svsk);
539         }
540
541         if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) {
542                 printk(KERN_WARNING "RPC svc_write_space: some sleeping on %p\n",
543                        svsk);
544                 wake_up_interruptible(sk->sk_sleep);
545         }
546 }
547
548 /*
549  * Receive a datagram from a UDP socket.
550  */
551 extern int
552 csum_partial_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb);
553
554 static int
555 svc_udp_recvfrom(struct svc_rqst *rqstp)
556 {
557         struct svc_sock *svsk = rqstp->rq_sock;
558         struct svc_serv *serv = svsk->sk_server;
559         struct sk_buff  *skb;
560         int             err, len;
561
562         if (test_and_clear_bit(SK_CHNGBUF, &svsk->sk_flags))
563             /* udp sockets need large rcvbuf as all pending
564              * requests are still in that buffer.  sndbuf must
565              * also be large enough that there is enough space
566              * for one reply per thread.
567              */
568             svc_sock_setbufsize(svsk->sk_sock,
569                                 (serv->sv_nrthreads+3) * serv->sv_bufsz,
570                                 (serv->sv_nrthreads+3) * serv->sv_bufsz);
571
572         if ((rqstp->rq_deferred = svc_deferred_dequeue(svsk))) {
573                 svc_sock_received(svsk);
574                 return svc_deferred_recv(rqstp);
575         }
576
577         clear_bit(SK_DATA, &svsk->sk_flags);
578         while ((skb = skb_recv_datagram(svsk->sk_sk, 0, 1, &err)) == NULL) {
579                 if (err == -EAGAIN) {
580                         svc_sock_received(svsk);
581                         return err;
582                 }
583                 /* possibly an icmp error */
584                 dprintk("svc: recvfrom returned error %d\n", -err);
585         }
586         if (skb->stamp.tv_sec == 0) {
587                 skb->stamp.tv_sec = xtime.tv_sec; 
588                 skb->stamp.tv_usec = xtime.tv_nsec / NSEC_PER_USEC; 
589                 /* Don't enable netstamp, sunrpc doesn't 
590                    need that much accuracy */
591         }
592         svsk->sk_sk->sk_stamp = skb->stamp;
593         set_bit(SK_DATA, &svsk->sk_flags); /* there may be more data... */
594
595         /*
596          * Maybe more packets - kick another thread ASAP.
597          */
598         svc_sock_received(svsk);
599
600         len  = skb->len - sizeof(struct udphdr);
601         rqstp->rq_arg.len = len;
602
603         rqstp->rq_prot        = IPPROTO_UDP;
604
605         /* Get sender address */
606         rqstp->rq_addr.sin_family = AF_INET;
607         rqstp->rq_addr.sin_port = skb->h.uh->source;
608         rqstp->rq_addr.sin_addr.s_addr = skb->nh.iph->saddr;
609         rqstp->rq_daddr = skb->nh.iph->daddr;
610
611         if (skb_is_nonlinear(skb)) {
612                 /* we have to copy */
613                 local_bh_disable();
614                 if (csum_partial_copy_to_xdr(&rqstp->rq_arg, skb)) {
615                         local_bh_enable();
616                         /* checksum error */
617                         skb_free_datagram(svsk->sk_sk, skb);
618                         return 0;
619                 }
620                 local_bh_enable();
621                 skb_free_datagram(svsk->sk_sk, skb); 
622         } else {
623                 /* we can use it in-place */
624                 rqstp->rq_arg.head[0].iov_base = skb->data + sizeof(struct udphdr);
625                 rqstp->rq_arg.head[0].iov_len = len;
626                 if (skb->ip_summed != CHECKSUM_UNNECESSARY) {
627                         if ((unsigned short)csum_fold(skb_checksum(skb, 0, skb->len, skb->csum))) {
628                                 skb_free_datagram(svsk->sk_sk, skb);
629                                 return 0;
630                         }
631                         skb->ip_summed = CHECKSUM_UNNECESSARY;
632                 }
633                 rqstp->rq_skbuff = skb;
634         }
635
636         rqstp->rq_arg.page_base = 0;
637         if (len <= rqstp->rq_arg.head[0].iov_len) {
638                 rqstp->rq_arg.head[0].iov_len = len;
639                 rqstp->rq_arg.page_len = 0;
640         } else {
641                 rqstp->rq_arg.page_len = len - rqstp->rq_arg.head[0].iov_len;
642                 rqstp->rq_argused += (rqstp->rq_arg.page_len + PAGE_SIZE - 1)/ PAGE_SIZE;
643         }
644
645         if (serv->sv_stats)
646                 serv->sv_stats->netudpcnt++;
647
648         return len;
649 }
650
651 static int
652 svc_udp_sendto(struct svc_rqst *rqstp)
653 {
654         int             error;
655
656         error = svc_sendto(rqstp, &rqstp->rq_res);
657         if (error == -ECONNREFUSED)
658                 /* ICMP error on earlier request. */
659                 error = svc_sendto(rqstp, &rqstp->rq_res);
660
661         return error;
662 }
663
664 static void
665 svc_udp_init(struct svc_sock *svsk)
666 {
667         svsk->sk_sk->sk_data_ready = svc_udp_data_ready;
668         svsk->sk_sk->sk_write_space = svc_write_space;
669         svsk->sk_recvfrom = svc_udp_recvfrom;
670         svsk->sk_sendto = svc_udp_sendto;
671
672         /* initialise setting must have enough space to
673          * receive and respond to one request.  
674          * svc_udp_recvfrom will re-adjust if necessary
675          */
676         svc_sock_setbufsize(svsk->sk_sock,
677                             3 * svsk->sk_server->sv_bufsz,
678                             3 * svsk->sk_server->sv_bufsz);
679
680         set_bit(SK_DATA, &svsk->sk_flags); /* might have come in before data_ready set up */
681         set_bit(SK_CHNGBUF, &svsk->sk_flags);
682 }
683
684 /*
685  * A data_ready event on a listening socket means there's a connection
686  * pending. Do not use state_change as a substitute for it.
687  */
688 static void
689 svc_tcp_listen_data_ready(struct sock *sk, int count_unused)
690 {
691         struct svc_sock *svsk;
692
693         dprintk("svc: socket %p TCP (listen) state change %d\n",
694                         sk, sk->sk_state);
695
696         if  (sk->sk_state != TCP_LISTEN) {
697                 /*
698                  * This callback may called twice when a new connection
699                  * is established as a child socket inherits everything
700                  * from a parent LISTEN socket.
701                  * 1) data_ready method of the parent socket will be called
702                  *    when one of child sockets become ESTABLISHED.
703                  * 2) data_ready method of the child socket may be called
704                  *    when it receives data before the socket is accepted.
705                  * In case of 2, we should ignore it silently.
706                  */
707                 goto out;
708         }
709         if (!(svsk = (struct svc_sock *) sk->sk_user_data)) {
710                 printk("svc: socket %p: no user data\n", sk);
711                 goto out;
712         }
713         set_bit(SK_CONN, &svsk->sk_flags);
714         svc_sock_enqueue(svsk);
715  out:
716         if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
717                 wake_up_interruptible_all(sk->sk_sleep);
718 }
719
720 /*
721  * A state change on a connected socket means it's dying or dead.
722  */
723 static void
724 svc_tcp_state_change(struct sock *sk)
725 {
726         struct svc_sock *svsk;
727
728         dprintk("svc: socket %p TCP (connected) state change %d (svsk %p)\n",
729                         sk, sk->sk_state, sk->sk_user_data);
730
731         if (!(svsk = (struct svc_sock *) sk->sk_user_data)) {
732                 printk("svc: socket %p: no user data\n", sk);
733                 goto out;
734         }
735         set_bit(SK_CLOSE, &svsk->sk_flags);
736         svc_sock_enqueue(svsk);
737  out:
738         if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
739                 wake_up_interruptible_all(sk->sk_sleep);
740 }
741
742 static void
743 svc_tcp_data_ready(struct sock *sk, int count)
744 {
745         struct svc_sock *       svsk;
746
747         dprintk("svc: socket %p TCP data ready (svsk %p)\n",
748                         sk, sk->sk_user_data);
749         if (!(svsk = (struct svc_sock *)(sk->sk_user_data)))
750                 goto out;
751         set_bit(SK_DATA, &svsk->sk_flags);
752         svc_sock_enqueue(svsk);
753  out:
754         if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
755                 wake_up_interruptible(sk->sk_sleep);
756 }
757
758 /*
759  * Accept a TCP connection
760  */
761 static void
762 svc_tcp_accept(struct svc_sock *svsk)
763 {
764         struct sockaddr_in sin;
765         struct svc_serv *serv = svsk->sk_server;
766         struct socket   *sock = svsk->sk_sock;
767         struct socket   *newsock;
768         struct proto_ops *ops;
769         struct svc_sock *newsvsk;
770         int             err, slen;
771
772         dprintk("svc: tcp_accept %p sock %p\n", svsk, sock);
773         if (!sock)
774                 return;
775
776         err = sock_create_lite(PF_INET, SOCK_STREAM, IPPROTO_TCP, &newsock);
777         if (err) {
778                 if (err == -ENOMEM)
779                         printk(KERN_WARNING "%s: no more sockets!\n",
780                                serv->sv_name);
781                 return;
782         }
783
784         dprintk("svc: tcp_accept %p allocated\n", newsock);
785         newsock->ops = ops = sock->ops;
786
787         clear_bit(SK_CONN, &svsk->sk_flags);
788         if ((err = ops->accept(sock, newsock, O_NONBLOCK)) < 0) {
789                 if (err != -EAGAIN && net_ratelimit())
790                         printk(KERN_WARNING "%s: accept failed (err %d)!\n",
791                                    serv->sv_name, -err);
792                 goto failed;            /* aborted connection or whatever */
793         }
794         set_bit(SK_CONN, &svsk->sk_flags);
795         svc_sock_enqueue(svsk);
796
797         slen = sizeof(sin);
798         err = ops->getname(newsock, (struct sockaddr *) &sin, &slen, 1);
799         if (err < 0) {
800                 if (net_ratelimit())
801                         printk(KERN_WARNING "%s: peername failed (err %d)!\n",
802                                    serv->sv_name, -err);
803                 goto failed;            /* aborted connection or whatever */
804         }
805
806         /* Ideally, we would want to reject connections from unauthorized
807          * hosts here, but when we get encription, the IP of the host won't
808          * tell us anything. For now just warn about unpriv connections.
809          */
810         if (ntohs(sin.sin_port) >= 1024) {
811                 dprintk(KERN_WARNING
812                         "%s: connect from unprivileged port: %u.%u.%u.%u:%d\n",
813                         serv->sv_name, 
814                         NIPQUAD(sin.sin_addr.s_addr), ntohs(sin.sin_port));
815         }
816
817         dprintk("%s: connect from %u.%u.%u.%u:%04x\n", serv->sv_name,
818                         NIPQUAD(sin.sin_addr.s_addr), ntohs(sin.sin_port));
819
820         /* make sure that a write doesn't block forever when
821          * low on memory
822          */
823         newsock->sk->sk_sndtimeo = HZ*30;
824
825         if (!(newsvsk = svc_setup_socket(serv, newsock, &err, 0)))
826                 goto failed;
827
828
829         /* make sure that we don't have too many active connections.
830          * If we have, something must be dropped.
831          *
832          * There's no point in trying to do random drop here for
833          * DoS prevention. The NFS clients does 1 reconnect in 15
834          * seconds. An attacker can easily beat that.
835          *
836          * The only somewhat efficient mechanism would be if drop
837          * old connections from the same IP first. But right now
838          * we don't even record the client IP in svc_sock.
839          */
840         if (serv->sv_tmpcnt > (serv->sv_nrthreads+3)*20) {
841                 struct svc_sock *svsk = NULL;
842                 spin_lock_bh(&serv->sv_lock);
843                 if (!list_empty(&serv->sv_tempsocks)) {
844                         if (net_ratelimit()) {
845                                 /* Try to help the admin */
846                                 printk(KERN_NOTICE "%s: too many open TCP "
847                                         "sockets, consider increasing the "
848                                         "number of nfsd threads\n",
849                                                    serv->sv_name);
850                                 printk(KERN_NOTICE "%s: last TCP connect from "
851                                         "%u.%u.%u.%u:%d\n",
852                                         serv->sv_name,
853                                         NIPQUAD(sin.sin_addr.s_addr),
854                                         ntohs(sin.sin_port));
855                         }
856                         /*
857                          * Always select the oldest socket. It's not fair,
858                          * but so is life
859                          */
860                         svsk = list_entry(serv->sv_tempsocks.prev,
861                                           struct svc_sock,
862                                           sk_list);
863                         set_bit(SK_CLOSE, &svsk->sk_flags);
864                         svsk->sk_inuse ++;
865                 }
866                 spin_unlock_bh(&serv->sv_lock);
867
868                 if (svsk) {
869                         svc_sock_enqueue(svsk);
870                         svc_sock_put(svsk);
871                 }
872
873         }
874
875         if (serv->sv_stats)
876                 serv->sv_stats->nettcpconn++;
877
878         return;
879
880 failed:
881         sock_release(newsock);
882         return;
883 }
884
885 /*
886  * Receive data from a TCP socket.
887  */
888 static int
889 svc_tcp_recvfrom(struct svc_rqst *rqstp)
890 {
891         struct svc_sock *svsk = rqstp->rq_sock;
892         struct svc_serv *serv = svsk->sk_server;
893         int             len;
894         struct kvec vec[RPCSVC_MAXPAGES];
895         int pnum, vlen;
896
897         dprintk("svc: tcp_recv %p data %d conn %d close %d\n",
898                 svsk, test_bit(SK_DATA, &svsk->sk_flags),
899                 test_bit(SK_CONN, &svsk->sk_flags),
900                 test_bit(SK_CLOSE, &svsk->sk_flags));
901
902         if ((rqstp->rq_deferred = svc_deferred_dequeue(svsk))) {
903                 svc_sock_received(svsk);
904                 return svc_deferred_recv(rqstp);
905         }
906
907         if (test_bit(SK_CLOSE, &svsk->sk_flags)) {
908                 svc_delete_socket(svsk);
909                 return 0;
910         }
911
912         if (test_bit(SK_CONN, &svsk->sk_flags)) {
913                 svc_tcp_accept(svsk);
914                 svc_sock_received(svsk);
915                 return 0;
916         }
917
918         if (test_and_clear_bit(SK_CHNGBUF, &svsk->sk_flags))
919                 /* sndbuf needs to have room for one request
920                  * per thread, otherwise we can stall even when the
921                  * network isn't a bottleneck.
922                  * rcvbuf just needs to be able to hold a few requests.
923                  * Normally they will be removed from the queue 
924                  * as soon a a complete request arrives.
925                  */
926                 svc_sock_setbufsize(svsk->sk_sock,
927                                     (serv->sv_nrthreads+3) * serv->sv_bufsz,
928                                     3 * serv->sv_bufsz);
929
930         clear_bit(SK_DATA, &svsk->sk_flags);
931
932         /* Receive data. If we haven't got the record length yet, get
933          * the next four bytes. Otherwise try to gobble up as much as
934          * possible up to the complete record length.
935          */
936         if (svsk->sk_tcplen < 4) {
937                 unsigned long   want = 4 - svsk->sk_tcplen;
938                 struct kvec     iov;
939
940                 iov.iov_base = ((char *) &svsk->sk_reclen) + svsk->sk_tcplen;
941                 iov.iov_len  = want;
942                 if ((len = svc_recvfrom(rqstp, &iov, 1, want)) < 0)
943                         goto error;
944                 svsk->sk_tcplen += len;
945
946                 if (len < want) {
947                         dprintk("svc: short recvfrom while reading record length (%d of %lu)\n",
948                                 len, want);
949                         svc_sock_received(svsk);
950                         return -EAGAIN; /* record header not complete */
951                 }
952
953                 svsk->sk_reclen = ntohl(svsk->sk_reclen);
954                 if (!(svsk->sk_reclen & 0x80000000)) {
955                         /* FIXME: technically, a record can be fragmented,
956                          *  and non-terminal fragments will not have the top
957                          *  bit set in the fragment length header.
958                          *  But apparently no known nfs clients send fragmented
959                          *  records. */
960                         printk(KERN_NOTICE "RPC: bad TCP reclen 0x%08lx (non-terminal)\n",
961                                (unsigned long) svsk->sk_reclen);
962                         goto err_delete;
963                 }
964                 svsk->sk_reclen &= 0x7fffffff;
965                 dprintk("svc: TCP record, %d bytes\n", svsk->sk_reclen);
966                 if (svsk->sk_reclen > serv->sv_bufsz) {
967                         printk(KERN_NOTICE "RPC: bad TCP reclen 0x%08lx (large)\n",
968                                (unsigned long) svsk->sk_reclen);
969                         goto err_delete;
970                 }
971         }
972
973         /* Check whether enough data is available */
974         len = svc_recv_available(svsk);
975         if (len < 0)
976                 goto error;
977
978         if (len < svsk->sk_reclen) {
979                 dprintk("svc: incomplete TCP record (%d of %d)\n",
980                         len, svsk->sk_reclen);
981                 svc_sock_received(svsk);
982                 return -EAGAIN; /* record not complete */
983         }
984         len = svsk->sk_reclen;
985         set_bit(SK_DATA, &svsk->sk_flags);
986
987         vec[0] = rqstp->rq_arg.head[0];
988         vlen = PAGE_SIZE;
989         pnum = 1;
990         while (vlen < len) {
991                 vec[pnum].iov_base = page_address(rqstp->rq_argpages[rqstp->rq_argused++]);
992                 vec[pnum].iov_len = PAGE_SIZE;
993                 pnum++;
994                 vlen += PAGE_SIZE;
995         }
996
997         /* Now receive data */
998         len = svc_recvfrom(rqstp, vec, pnum, len);
999         if (len < 0)
1000                 goto error;
1001
1002         dprintk("svc: TCP complete record (%d bytes)\n", len);
1003         rqstp->rq_arg.len = len;
1004         rqstp->rq_arg.page_base = 0;
1005         if (len <= rqstp->rq_arg.head[0].iov_len) {
1006                 rqstp->rq_arg.head[0].iov_len = len;
1007                 rqstp->rq_arg.page_len = 0;
1008         } else {
1009                 rqstp->rq_arg.page_len = len - rqstp->rq_arg.head[0].iov_len;
1010         }
1011
1012         rqstp->rq_skbuff      = NULL;
1013         rqstp->rq_prot        = IPPROTO_TCP;
1014
1015         /* Reset TCP read info */
1016         svsk->sk_reclen = 0;
1017         svsk->sk_tcplen = 0;
1018
1019         svc_sock_received(svsk);
1020         if (serv->sv_stats)
1021                 serv->sv_stats->nettcpcnt++;
1022
1023         return len;
1024
1025  err_delete:
1026         svc_delete_socket(svsk);
1027         return -EAGAIN;
1028
1029  error:
1030         if (len == -EAGAIN) {
1031                 dprintk("RPC: TCP recvfrom got EAGAIN\n");
1032                 svc_sock_received(svsk);
1033         } else {
1034                 printk(KERN_NOTICE "%s: recvfrom returned errno %d\n",
1035                                         svsk->sk_server->sv_name, -len);
1036                 svc_sock_received(svsk);
1037         }
1038
1039         return len;
1040 }
1041
1042 /*
1043  * Send out data on TCP socket.
1044  */
1045 static int
1046 svc_tcp_sendto(struct svc_rqst *rqstp)
1047 {
1048         struct xdr_buf  *xbufp = &rqstp->rq_res;
1049         int sent;
1050         u32 reclen;
1051
1052         /* Set up the first element of the reply kvec.
1053          * Any other kvecs that may be in use have been taken
1054          * care of by the server implementation itself.
1055          */
1056         reclen = htonl(0x80000000|((xbufp->len ) - 4));
1057         memcpy(xbufp->head[0].iov_base, &reclen, 4);
1058
1059         if (test_bit(SK_DEAD, &rqstp->rq_sock->sk_flags))
1060                 return -ENOTCONN;
1061
1062         sent = svc_sendto(rqstp, &rqstp->rq_res);
1063         if (sent != xbufp->len) {
1064                 printk(KERN_NOTICE "rpc-srv/tcp: %s: %s %d when sending %d bytes - shutting down socket\n",
1065                        rqstp->rq_sock->sk_server->sv_name,
1066                        (sent<0)?"got error":"sent only",
1067                        sent, xbufp->len);
1068                 svc_delete_socket(rqstp->rq_sock);
1069                 sent = -EAGAIN;
1070         }
1071         return sent;
1072 }
1073
1074 static void
1075 svc_tcp_init(struct svc_sock *svsk)
1076 {
1077         struct sock     *sk = svsk->sk_sk;
1078         struct tcp_sock *tp = tcp_sk(sk);
1079
1080         svsk->sk_recvfrom = svc_tcp_recvfrom;
1081         svsk->sk_sendto = svc_tcp_sendto;
1082
1083         if (sk->sk_state == TCP_LISTEN) {
1084                 dprintk("setting up TCP socket for listening\n");
1085                 sk->sk_data_ready = svc_tcp_listen_data_ready;
1086                 set_bit(SK_CONN, &svsk->sk_flags);
1087         } else {
1088                 dprintk("setting up TCP socket for reading\n");
1089                 sk->sk_state_change = svc_tcp_state_change;
1090                 sk->sk_data_ready = svc_tcp_data_ready;
1091                 sk->sk_write_space = svc_write_space;
1092
1093                 svsk->sk_reclen = 0;
1094                 svsk->sk_tcplen = 0;
1095
1096                 tp->nonagle = 1;        /* disable Nagle's algorithm */
1097
1098                 /* initialise setting must have enough space to
1099                  * receive and respond to one request.  
1100                  * svc_tcp_recvfrom will re-adjust if necessary
1101                  */
1102                 svc_sock_setbufsize(svsk->sk_sock,
1103                                     3 * svsk->sk_server->sv_bufsz,
1104                                     3 * svsk->sk_server->sv_bufsz);
1105
1106                 set_bit(SK_CHNGBUF, &svsk->sk_flags);
1107                 set_bit(SK_DATA, &svsk->sk_flags);
1108                 if (sk->sk_state != TCP_ESTABLISHED) 
1109                         set_bit(SK_CLOSE, &svsk->sk_flags);
1110         }
1111 }
1112
1113 void
1114 svc_sock_update_bufs(struct svc_serv *serv)
1115 {
1116         /*
1117          * The number of server threads has changed. Update
1118          * rcvbuf and sndbuf accordingly on all sockets
1119          */
1120         struct list_head *le;
1121
1122         spin_lock_bh(&serv->sv_lock);
1123         list_for_each(le, &serv->sv_permsocks) {
1124                 struct svc_sock *svsk = 
1125                         list_entry(le, struct svc_sock, sk_list);
1126                 set_bit(SK_CHNGBUF, &svsk->sk_flags);
1127         }
1128         list_for_each(le, &serv->sv_tempsocks) {
1129                 struct svc_sock *svsk =
1130                         list_entry(le, struct svc_sock, sk_list);
1131                 set_bit(SK_CHNGBUF, &svsk->sk_flags);
1132         }
1133         spin_unlock_bh(&serv->sv_lock);
1134 }
1135
1136 /*
1137  * Receive the next request on any socket.
1138  */
1139 int
1140 svc_recv(struct svc_serv *serv, struct svc_rqst *rqstp, long timeout)
1141 {
1142         struct svc_sock         *svsk =NULL;
1143         int                     len;
1144         int                     pages;
1145         struct xdr_buf          *arg;
1146         DECLARE_WAITQUEUE(wait, current);
1147
1148         dprintk("svc: server %p waiting for data (to = %ld)\n",
1149                 rqstp, timeout);
1150
1151         if (rqstp->rq_sock)
1152                 printk(KERN_ERR 
1153                         "svc_recv: service %p, socket not NULL!\n",
1154                          rqstp);
1155         if (waitqueue_active(&rqstp->rq_wait))
1156                 printk(KERN_ERR 
1157                         "svc_recv: service %p, wait queue active!\n",
1158                          rqstp);
1159
1160         /* Initialize the buffers */
1161         /* first reclaim pages that were moved to response list */
1162         svc_pushback_allpages(rqstp);
1163
1164         /* now allocate needed pages.  If we get a failure, sleep briefly */
1165         pages = 2 + (serv->sv_bufsz + PAGE_SIZE -1) / PAGE_SIZE;
1166         while (rqstp->rq_arghi < pages) {
1167                 struct page *p = alloc_page(GFP_KERNEL);
1168                 if (!p) {
1169                         set_current_state(TASK_UNINTERRUPTIBLE);
1170                         schedule_timeout(HZ/2);
1171                         continue;
1172                 }
1173                 rqstp->rq_argpages[rqstp->rq_arghi++] = p;
1174         }
1175
1176         /* Make arg->head point to first page and arg->pages point to rest */
1177         arg = &rqstp->rq_arg;
1178         arg->head[0].iov_base = page_address(rqstp->rq_argpages[0]);
1179         arg->head[0].iov_len = PAGE_SIZE;
1180         rqstp->rq_argused = 1;
1181         arg->pages = rqstp->rq_argpages + 1;
1182         arg->page_base = 0;
1183         /* save at least one page for response */
1184         arg->page_len = (pages-2)*PAGE_SIZE;
1185         arg->len = (pages-1)*PAGE_SIZE;
1186         arg->tail[0].iov_len = 0;
1187
1188         try_to_freeze();
1189         if (signalled())
1190                 return -EINTR;
1191
1192         spin_lock_bh(&serv->sv_lock);
1193         if (!list_empty(&serv->sv_tempsocks)) {
1194                 svsk = list_entry(serv->sv_tempsocks.next,
1195                                   struct svc_sock, sk_list);
1196                 /* apparently the "standard" is that clients close
1197                  * idle connections after 5 minutes, servers after
1198                  * 6 minutes
1199                  *   http://www.connectathon.org/talks96/nfstcp.pdf 
1200                  */
1201                 if (get_seconds() - svsk->sk_lastrecv < 6*60
1202                     || test_bit(SK_BUSY, &svsk->sk_flags))
1203                         svsk = NULL;
1204         }
1205         if (svsk) {
1206                 set_bit(SK_BUSY, &svsk->sk_flags);
1207                 set_bit(SK_CLOSE, &svsk->sk_flags);
1208                 rqstp->rq_sock = svsk;
1209                 svsk->sk_inuse++;
1210         } else if ((svsk = svc_sock_dequeue(serv)) != NULL) {
1211                 rqstp->rq_sock = svsk;
1212                 svsk->sk_inuse++;
1213                 rqstp->rq_reserved = serv->sv_bufsz;    
1214                 svsk->sk_reserved += rqstp->rq_reserved;
1215         } else {
1216                 /* No data pending. Go to sleep */
1217                 svc_serv_enqueue(serv, rqstp);
1218
1219                 /*
1220                  * We have to be able to interrupt this wait
1221                  * to bring down the daemons ...
1222                  */
1223                 set_current_state(TASK_INTERRUPTIBLE);
1224                 add_wait_queue(&rqstp->rq_wait, &wait);
1225                 spin_unlock_bh(&serv->sv_lock);
1226
1227                 schedule_timeout(timeout);
1228
1229                 try_to_freeze();
1230
1231                 spin_lock_bh(&serv->sv_lock);
1232                 remove_wait_queue(&rqstp->rq_wait, &wait);
1233
1234                 if (!(svsk = rqstp->rq_sock)) {
1235                         svc_serv_dequeue(serv, rqstp);
1236                         spin_unlock_bh(&serv->sv_lock);
1237                         dprintk("svc: server %p, no data yet\n", rqstp);
1238                         return signalled()? -EINTR : -EAGAIN;
1239                 }
1240         }
1241         spin_unlock_bh(&serv->sv_lock);
1242
1243         dprintk("svc: server %p, socket %p, inuse=%d\n",
1244                  rqstp, svsk, svsk->sk_inuse);
1245         len = svsk->sk_recvfrom(rqstp);
1246         dprintk("svc: got len=%d\n", len);
1247
1248         /* No data, incomplete (TCP) read, or accept() */
1249         if (len == 0 || len == -EAGAIN) {
1250                 rqstp->rq_res.len = 0;
1251                 svc_sock_release(rqstp);
1252                 return -EAGAIN;
1253         }
1254         svsk->sk_lastrecv = get_seconds();
1255         if (test_bit(SK_TEMP, &svsk->sk_flags)) {
1256                 /* push active sockets to end of list */
1257                 spin_lock_bh(&serv->sv_lock);
1258                 if (!list_empty(&svsk->sk_list))
1259                         list_move_tail(&svsk->sk_list, &serv->sv_tempsocks);
1260                 spin_unlock_bh(&serv->sv_lock);
1261         }
1262
1263         rqstp->rq_secure  = ntohs(rqstp->rq_addr.sin_port) < 1024;
1264         rqstp->rq_chandle.defer = svc_defer;
1265
1266         if (serv->sv_stats)
1267                 serv->sv_stats->netcnt++;
1268         return len;
1269 }
1270
1271 /* 
1272  * Drop request
1273  */
1274 void
1275 svc_drop(struct svc_rqst *rqstp)
1276 {
1277         dprintk("svc: socket %p dropped request\n", rqstp->rq_sock);
1278         svc_sock_release(rqstp);
1279 }
1280
1281 /*
1282  * Return reply to client.
1283  */
1284 int
1285 svc_send(struct svc_rqst *rqstp)
1286 {
1287         struct svc_sock *svsk;
1288         int             len;
1289         struct xdr_buf  *xb;
1290
1291         if ((svsk = rqstp->rq_sock) == NULL) {
1292                 printk(KERN_WARNING "NULL socket pointer in %s:%d\n",
1293                                 __FILE__, __LINE__);
1294                 return -EFAULT;
1295         }
1296
1297         /* release the receive skb before sending the reply */
1298         svc_release_skb(rqstp);
1299
1300         /* calculate over-all length */
1301         xb = & rqstp->rq_res;
1302         xb->len = xb->head[0].iov_len +
1303                 xb->page_len +
1304                 xb->tail[0].iov_len;
1305
1306         /* Grab svsk->sk_sem to serialize outgoing data. */
1307         down(&svsk->sk_sem);
1308         if (test_bit(SK_DEAD, &svsk->sk_flags))
1309                 len = -ENOTCONN;
1310         else
1311                 len = svsk->sk_sendto(rqstp);
1312         up(&svsk->sk_sem);
1313         svc_sock_release(rqstp);
1314
1315         if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN)
1316                 return 0;
1317         return len;
1318 }
1319
1320 /*
1321  * Initialize socket for RPC use and create svc_sock struct
1322  * XXX: May want to setsockopt SO_SNDBUF and SO_RCVBUF.
1323  */
1324 static struct svc_sock *
1325 svc_setup_socket(struct svc_serv *serv, struct socket *sock,
1326                                         int *errp, int pmap_register)
1327 {
1328         struct svc_sock *svsk;
1329         struct sock     *inet;
1330
1331         dprintk("svc: svc_setup_socket %p\n", sock);
1332         if (!(svsk = kmalloc(sizeof(*svsk), GFP_KERNEL))) {
1333                 *errp = -ENOMEM;
1334                 return NULL;
1335         }
1336         memset(svsk, 0, sizeof(*svsk));
1337
1338         inet = sock->sk;
1339
1340         /* Register socket with portmapper */
1341         if (*errp >= 0 && pmap_register)
1342                 *errp = svc_register(serv, inet->sk_protocol,
1343                                      ntohs(inet_sk(inet)->sport));
1344
1345         if (*errp < 0) {
1346                 kfree(svsk);
1347                 return NULL;
1348         }
1349
1350         set_bit(SK_BUSY, &svsk->sk_flags);
1351         inet->sk_user_data = svsk;
1352         svsk->sk_sock = sock;
1353         svsk->sk_sk = inet;
1354         svsk->sk_ostate = inet->sk_state_change;
1355         svsk->sk_odata = inet->sk_data_ready;
1356         svsk->sk_owspace = inet->sk_write_space;
1357         svsk->sk_server = serv;
1358         svsk->sk_lastrecv = get_seconds();
1359         INIT_LIST_HEAD(&svsk->sk_deferred);
1360         INIT_LIST_HEAD(&svsk->sk_ready);
1361         sema_init(&svsk->sk_sem, 1);
1362
1363         /* Initialize the socket */
1364         if (sock->type == SOCK_DGRAM)
1365                 svc_udp_init(svsk);
1366         else
1367                 svc_tcp_init(svsk);
1368
1369         spin_lock_bh(&serv->sv_lock);
1370         if (!pmap_register) {
1371                 set_bit(SK_TEMP, &svsk->sk_flags);
1372                 list_add(&svsk->sk_list, &serv->sv_tempsocks);
1373                 serv->sv_tmpcnt++;
1374         } else {
1375                 clear_bit(SK_TEMP, &svsk->sk_flags);
1376                 list_add(&svsk->sk_list, &serv->sv_permsocks);
1377         }
1378         spin_unlock_bh(&serv->sv_lock);
1379
1380         dprintk("svc: svc_setup_socket created %p (inet %p)\n",
1381                                 svsk, svsk->sk_sk);
1382
1383         clear_bit(SK_BUSY, &svsk->sk_flags);
1384         svc_sock_enqueue(svsk);
1385         return svsk;
1386 }
1387
1388 /*
1389  * Create socket for RPC service.
1390  */
1391 static int
1392 svc_create_socket(struct svc_serv *serv, int protocol, struct sockaddr_in *sin)
1393 {
1394         struct svc_sock *svsk;
1395         struct socket   *sock;
1396         int             error;
1397         int             type;
1398
1399         dprintk("svc: svc_create_socket(%s, %d, %u.%u.%u.%u:%d)\n",
1400                                 serv->sv_program->pg_name, protocol,
1401                                 NIPQUAD(sin->sin_addr.s_addr),
1402                                 ntohs(sin->sin_port));
1403
1404         if (protocol != IPPROTO_UDP && protocol != IPPROTO_TCP) {
1405                 printk(KERN_WARNING "svc: only UDP and TCP "
1406                                 "sockets supported\n");
1407                 return -EINVAL;
1408         }
1409         type = (protocol == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
1410
1411         if ((error = sock_create_kern(PF_INET, type, protocol, &sock)) < 0)
1412                 return error;
1413
1414         if (sin != NULL) {
1415                 if (type == SOCK_STREAM)
1416                         sock->sk->sk_reuse = 1; /* allow address reuse */
1417                 error = sock->ops->bind(sock, (struct sockaddr *) sin,
1418                                                 sizeof(*sin));
1419                 if (error < 0)
1420                         goto bummer;
1421         }
1422
1423         if (protocol == IPPROTO_TCP) {
1424                 if ((error = sock->ops->listen(sock, 64)) < 0)
1425                         goto bummer;
1426         }
1427
1428         if ((svsk = svc_setup_socket(serv, sock, &error, 1)) != NULL)
1429                 return 0;
1430
1431 bummer:
1432         dprintk("svc: svc_create_socket error = %d\n", -error);
1433         sock_release(sock);
1434         return error;
1435 }
1436
1437 /*
1438  * Remove a dead socket
1439  */
1440 void
1441 svc_delete_socket(struct svc_sock *svsk)
1442 {
1443         struct svc_serv *serv;
1444         struct sock     *sk;
1445
1446         dprintk("svc: svc_delete_socket(%p)\n", svsk);
1447
1448         serv = svsk->sk_server;
1449         sk = svsk->sk_sk;
1450
1451         sk->sk_state_change = svsk->sk_ostate;
1452         sk->sk_data_ready = svsk->sk_odata;
1453         sk->sk_write_space = svsk->sk_owspace;
1454
1455         spin_lock_bh(&serv->sv_lock);
1456
1457         list_del_init(&svsk->sk_list);
1458         list_del_init(&svsk->sk_ready);
1459         if (!test_and_set_bit(SK_DEAD, &svsk->sk_flags))
1460                 if (test_bit(SK_TEMP, &svsk->sk_flags))
1461                         serv->sv_tmpcnt--;
1462
1463         if (!svsk->sk_inuse) {
1464                 spin_unlock_bh(&serv->sv_lock);
1465                 sock_release(svsk->sk_sock);
1466                 kfree(svsk);
1467         } else {
1468                 spin_unlock_bh(&serv->sv_lock);
1469                 dprintk(KERN_NOTICE "svc: server socket destroy delayed\n");
1470                 /* svsk->sk_server = NULL; */
1471         }
1472 }
1473
1474 /*
1475  * Make a socket for nfsd and lockd
1476  */
1477 int
1478 svc_makesock(struct svc_serv *serv, int protocol, unsigned short port)
1479 {
1480         struct sockaddr_in      sin;
1481
1482         dprintk("svc: creating socket proto = %d\n", protocol);
1483         sin.sin_family      = AF_INET;
1484         sin.sin_addr.s_addr = INADDR_ANY;
1485         sin.sin_port        = htons(port);
1486         return svc_create_socket(serv, protocol, &sin);
1487 }
1488
1489 /*
1490  * Handle defer and revisit of requests 
1491  */
1492
1493 static void svc_revisit(struct cache_deferred_req *dreq, int too_many)
1494 {
1495         struct svc_deferred_req *dr = container_of(dreq, struct svc_deferred_req, handle);
1496         struct svc_serv *serv = dreq->owner;
1497         struct svc_sock *svsk;
1498
1499         if (too_many) {
1500                 svc_sock_put(dr->svsk);
1501                 kfree(dr);
1502                 return;
1503         }
1504         dprintk("revisit queued\n");
1505         svsk = dr->svsk;
1506         dr->svsk = NULL;
1507         spin_lock_bh(&serv->sv_lock);
1508         list_add(&dr->handle.recent, &svsk->sk_deferred);
1509         spin_unlock_bh(&serv->sv_lock);
1510         set_bit(SK_DEFERRED, &svsk->sk_flags);
1511         svc_sock_enqueue(svsk);
1512         svc_sock_put(svsk);
1513 }
1514
1515 static struct cache_deferred_req *
1516 svc_defer(struct cache_req *req)
1517 {
1518         struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle);
1519         int size = sizeof(struct svc_deferred_req) + (rqstp->rq_arg.len);
1520         struct svc_deferred_req *dr;
1521
1522         if (rqstp->rq_arg.page_len)
1523                 return NULL; /* if more than a page, give up FIXME */
1524         if (rqstp->rq_deferred) {
1525                 dr = rqstp->rq_deferred;
1526                 rqstp->rq_deferred = NULL;
1527         } else {
1528                 int skip  = rqstp->rq_arg.len - rqstp->rq_arg.head[0].iov_len;
1529                 /* FIXME maybe discard if size too large */
1530                 dr = kmalloc(size, GFP_KERNEL);
1531                 if (dr == NULL)
1532                         return NULL;
1533
1534                 dr->handle.owner = rqstp->rq_server;
1535                 dr->prot = rqstp->rq_prot;
1536                 dr->addr = rqstp->rq_addr;
1537                 dr->argslen = rqstp->rq_arg.len >> 2;
1538                 memcpy(dr->args, rqstp->rq_arg.head[0].iov_base-skip, dr->argslen<<2);
1539         }
1540         spin_lock_bh(&rqstp->rq_server->sv_lock);
1541         rqstp->rq_sock->sk_inuse++;
1542         dr->svsk = rqstp->rq_sock;
1543         spin_unlock_bh(&rqstp->rq_server->sv_lock);
1544
1545         dr->handle.revisit = svc_revisit;
1546         return &dr->handle;
1547 }
1548
1549 /*
1550  * recv data from a deferred request into an active one
1551  */
1552 static int svc_deferred_recv(struct svc_rqst *rqstp)
1553 {
1554         struct svc_deferred_req *dr = rqstp->rq_deferred;
1555
1556         rqstp->rq_arg.head[0].iov_base = dr->args;
1557         rqstp->rq_arg.head[0].iov_len = dr->argslen<<2;
1558         rqstp->rq_arg.page_len = 0;
1559         rqstp->rq_arg.len = dr->argslen<<2;
1560         rqstp->rq_prot        = dr->prot;
1561         rqstp->rq_addr        = dr->addr;
1562         return dr->argslen<<2;
1563 }
1564
1565
1566 static struct svc_deferred_req *svc_deferred_dequeue(struct svc_sock *svsk)
1567 {
1568         struct svc_deferred_req *dr = NULL;
1569         struct svc_serv *serv = svsk->sk_server;
1570         
1571         if (!test_bit(SK_DEFERRED, &svsk->sk_flags))
1572                 return NULL;
1573         spin_lock_bh(&serv->sv_lock);
1574         clear_bit(SK_DEFERRED, &svsk->sk_flags);
1575         if (!list_empty(&svsk->sk_deferred)) {
1576                 dr = list_entry(svsk->sk_deferred.next,
1577                                 struct svc_deferred_req,
1578                                 handle.recent);
1579                 list_del_init(&dr->handle.recent);
1580                 set_bit(SK_DEFERRED, &svsk->sk_flags);
1581         }
1582         spin_unlock_bh(&serv->sv_lock);
1583         return dr;
1584 }