]> err.no Git - linux-2.6/blob - net/sunrpc/xprt.c
[PATCH] RPC: introduce client-side transport switch
[linux-2.6] / net / sunrpc / xprt.c
1 /*
2  *  linux/net/sunrpc/xprt.c
3  *
4  *  This is a generic RPC call interface supporting congestion avoidance,
5  *  and asynchronous calls.
6  *
7  *  The interface works like this:
8  *
9  *  -   When a process places a call, it allocates a request slot if
10  *      one is available. Otherwise, it sleeps on the backlog queue
11  *      (xprt_reserve).
12  *  -   Next, the caller puts together the RPC message, stuffs it into
13  *      the request struct, and calls xprt_call().
14  *  -   xprt_call transmits the message and installs the caller on the
15  *      socket's wait list. At the same time, it installs a timer that
16  *      is run after the packet's timeout has expired.
17  *  -   When a packet arrives, the data_ready handler walks the list of
18  *      pending requests for that socket. If a matching XID is found, the
19  *      caller is woken up, and the timer removed.
20  *  -   When no reply arrives within the timeout interval, the timer is
21  *      fired by the kernel and runs xprt_timer(). It either adjusts the
22  *      timeout values (minor timeout) or wakes up the caller with a status
23  *      of -ETIMEDOUT.
24  *  -   When the caller receives a notification from RPC that a reply arrived,
25  *      it should release the RPC slot, and process the reply.
26  *      If the call timed out, it may choose to retry the operation by
27  *      adjusting the initial timeout value, and simply calling rpc_call
28  *      again.
29  *
30  *  Support for async RPC is done through a set of RPC-specific scheduling
31  *  primitives that `transparently' work for processes as well as async
32  *  tasks that rely on callbacks.
33  *
34  *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
35  */
36
37 #include <linux/module.h>
38
39 #include <linux/types.h>
40 #include <linux/interrupt.h>
41 #include <linux/workqueue.h>
42 #include <linux/random.h>
43
44 #include <linux/sunrpc/clnt.h>
45
46 /*
47  * Local variables
48  */
49
50 #ifdef RPC_DEBUG
51 # undef  RPC_DEBUG_DATA
52 # define RPCDBG_FACILITY        RPCDBG_XPRT
53 #endif
54
55 #define XPRT_MAX_BACKOFF        (8)
56
57 /*
58  * Local functions
59  */
60 static void     xprt_request_init(struct rpc_task *, struct rpc_xprt *);
61 static inline void      do_xprt_reserve(struct rpc_task *);
62 static void     xprt_connect_status(struct rpc_task *task);
63 static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
64
65 static int      xprt_clear_backlog(struct rpc_xprt *xprt);
66
67 /*
68  * Serialize write access to sockets, in order to prevent different
69  * requests from interfering with each other.
70  * Also prevents TCP socket connects from colliding with writes.
71  */
72 static int
73 __xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
74 {
75         struct rpc_rqst *req = task->tk_rqstp;
76
77         if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate)) {
78                 if (task == xprt->snd_task)
79                         return 1;
80                 goto out_sleep;
81         }
82         if (xprt->nocong || __xprt_get_cong(xprt, task)) {
83                 xprt->snd_task = task;
84                 if (req) {
85                         req->rq_bytes_sent = 0;
86                         req->rq_ntrans++;
87                 }
88                 return 1;
89         }
90         smp_mb__before_clear_bit();
91         clear_bit(XPRT_LOCKED, &xprt->sockstate);
92         smp_mb__after_clear_bit();
93 out_sleep:
94         dprintk("RPC: %4d failed to lock socket %p\n", task->tk_pid, xprt);
95         task->tk_timeout = 0;
96         task->tk_status = -EAGAIN;
97         if (req && req->rq_ntrans)
98                 rpc_sleep_on(&xprt->resend, task, NULL, NULL);
99         else
100                 rpc_sleep_on(&xprt->sending, task, NULL, NULL);
101         return 0;
102 }
103
104 static inline int
105 xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
106 {
107         int retval;
108
109         spin_lock_bh(&xprt->sock_lock);
110         retval = __xprt_lock_write(xprt, task);
111         spin_unlock_bh(&xprt->sock_lock);
112         return retval;
113 }
114
115
116 static void
117 __xprt_lock_write_next(struct rpc_xprt *xprt)
118 {
119         struct rpc_task *task;
120
121         if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate))
122                 return;
123         if (!xprt->nocong && RPCXPRT_CONGESTED(xprt))
124                 goto out_unlock;
125         task = rpc_wake_up_next(&xprt->resend);
126         if (!task) {
127                 task = rpc_wake_up_next(&xprt->sending);
128                 if (!task)
129                         goto out_unlock;
130         }
131         if (xprt->nocong || __xprt_get_cong(xprt, task)) {
132                 struct rpc_rqst *req = task->tk_rqstp;
133                 xprt->snd_task = task;
134                 if (req) {
135                         req->rq_bytes_sent = 0;
136                         req->rq_ntrans++;
137                 }
138                 return;
139         }
140 out_unlock:
141         smp_mb__before_clear_bit();
142         clear_bit(XPRT_LOCKED, &xprt->sockstate);
143         smp_mb__after_clear_bit();
144 }
145
146 /*
147  * Releases the socket for use by other requests.
148  */
149 static void
150 __xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
151 {
152         if (xprt->snd_task == task) {
153                 xprt->snd_task = NULL;
154                 smp_mb__before_clear_bit();
155                 clear_bit(XPRT_LOCKED, &xprt->sockstate);
156                 smp_mb__after_clear_bit();
157                 __xprt_lock_write_next(xprt);
158         }
159 }
160
161 static inline void
162 xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
163 {
164         spin_lock_bh(&xprt->sock_lock);
165         __xprt_release_write(xprt, task);
166         spin_unlock_bh(&xprt->sock_lock);
167 }
168
169 /*
170  * Van Jacobson congestion avoidance. Check if the congestion window
171  * overflowed. Put the task to sleep if this is the case.
172  */
173 static int
174 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
175 {
176         struct rpc_rqst *req = task->tk_rqstp;
177
178         if (req->rq_cong)
179                 return 1;
180         dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n",
181                         task->tk_pid, xprt->cong, xprt->cwnd);
182         if (RPCXPRT_CONGESTED(xprt))
183                 return 0;
184         req->rq_cong = 1;
185         xprt->cong += RPC_CWNDSCALE;
186         return 1;
187 }
188
189 /*
190  * Adjust the congestion window, and wake up the next task
191  * that has been sleeping due to congestion
192  */
193 static void
194 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
195 {
196         if (!req->rq_cong)
197                 return;
198         req->rq_cong = 0;
199         xprt->cong -= RPC_CWNDSCALE;
200         __xprt_lock_write_next(xprt);
201 }
202
203 /*
204  * Adjust RPC congestion window
205  * We use a time-smoothed congestion estimator to avoid heavy oscillation.
206  */
207 static void
208 xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
209 {
210         unsigned long   cwnd;
211
212         cwnd = xprt->cwnd;
213         if (result >= 0 && cwnd <= xprt->cong) {
214                 /* The (cwnd >> 1) term makes sure
215                  * the result gets rounded properly. */
216                 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
217                 if (cwnd > RPC_MAXCWND(xprt))
218                         cwnd = RPC_MAXCWND(xprt);
219                 __xprt_lock_write_next(xprt);
220         } else if (result == -ETIMEDOUT) {
221                 cwnd >>= 1;
222                 if (cwnd < RPC_CWNDSCALE)
223                         cwnd = RPC_CWNDSCALE;
224         }
225         dprintk("RPC:      cong %ld, cwnd was %ld, now %ld\n",
226                         xprt->cong, xprt->cwnd, cwnd);
227         xprt->cwnd = cwnd;
228 }
229
230 /*
231  * Reset the major timeout value
232  */
233 static void xprt_reset_majortimeo(struct rpc_rqst *req)
234 {
235         struct rpc_timeout *to = &req->rq_xprt->timeout;
236
237         req->rq_majortimeo = req->rq_timeout;
238         if (to->to_exponential)
239                 req->rq_majortimeo <<= to->to_retries;
240         else
241                 req->rq_majortimeo += to->to_increment * to->to_retries;
242         if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0)
243                 req->rq_majortimeo = to->to_maxval;
244         req->rq_majortimeo += jiffies;
245 }
246
247 /*
248  * Adjust timeout values etc for next retransmit
249  */
250 int xprt_adjust_timeout(struct rpc_rqst *req)
251 {
252         struct rpc_xprt *xprt = req->rq_xprt;
253         struct rpc_timeout *to = &xprt->timeout;
254         int status = 0;
255
256         if (time_before(jiffies, req->rq_majortimeo)) {
257                 if (to->to_exponential)
258                         req->rq_timeout <<= 1;
259                 else
260                         req->rq_timeout += to->to_increment;
261                 if (to->to_maxval && req->rq_timeout >= to->to_maxval)
262                         req->rq_timeout = to->to_maxval;
263                 req->rq_retries++;
264                 pprintk("RPC: %lu retrans\n", jiffies);
265         } else {
266                 req->rq_timeout = to->to_initval;
267                 req->rq_retries = 0;
268                 xprt_reset_majortimeo(req);
269                 /* Reset the RTT counters == "slow start" */
270                 spin_lock_bh(&xprt->sock_lock);
271                 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
272                 spin_unlock_bh(&xprt->sock_lock);
273                 pprintk("RPC: %lu timeout\n", jiffies);
274                 status = -ETIMEDOUT;
275         }
276
277         if (req->rq_timeout == 0) {
278                 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
279                 req->rq_timeout = 5 * HZ;
280         }
281         return status;
282 }
283
284 static void
285 xprt_socket_autoclose(void *args)
286 {
287         struct rpc_xprt *xprt = (struct rpc_xprt *)args;
288
289         xprt_disconnect(xprt);
290         xprt->ops->close(xprt);
291         xprt_release_write(xprt, NULL);
292 }
293
294 /*
295  * Mark a transport as disconnected
296  */
297 void xprt_disconnect(struct rpc_xprt *xprt)
298 {
299         dprintk("RPC:      disconnected transport %p\n", xprt);
300         spin_lock_bh(&xprt->sock_lock);
301         xprt_clear_connected(xprt);
302         rpc_wake_up_status(&xprt->pending, -ENOTCONN);
303         spin_unlock_bh(&xprt->sock_lock);
304 }
305
306 /*
307  * Used to allow disconnection when we've been idle
308  */
309 static void
310 xprt_init_autodisconnect(unsigned long data)
311 {
312         struct rpc_xprt *xprt = (struct rpc_xprt *)data;
313
314         spin_lock(&xprt->sock_lock);
315         if (!list_empty(&xprt->recv) || xprt->shutdown)
316                 goto out_abort;
317         if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate))
318                 goto out_abort;
319         spin_unlock(&xprt->sock_lock);
320         /* Let keventd close the socket */
321         if (test_bit(XPRT_CONNECTING, &xprt->sockstate) != 0)
322                 xprt_release_write(xprt, NULL);
323         else
324                 schedule_work(&xprt->task_cleanup);
325         return;
326 out_abort:
327         spin_unlock(&xprt->sock_lock);
328 }
329
330 /*
331  * Attempt to connect a TCP socket.
332  *
333  */
334 void xprt_connect(struct rpc_task *task)
335 {
336         struct rpc_xprt *xprt = task->tk_xprt;
337
338         dprintk("RPC: %4d xprt_connect xprt %p %s connected\n", task->tk_pid,
339                         xprt, (xprt_connected(xprt) ? "is" : "is not"));
340
341         if (xprt->shutdown) {
342                 task->tk_status = -EIO;
343                 return;
344         }
345         if (!xprt->addr.sin_port) {
346                 task->tk_status = -EIO;
347                 return;
348         }
349         if (!xprt_lock_write(xprt, task))
350                 return;
351         if (xprt_connected(xprt))
352                 xprt_release_write(xprt, task);
353         else {
354                 if (task->tk_rqstp)
355                         task->tk_rqstp->rq_bytes_sent = 0;
356
357                 task->tk_timeout = RPC_CONNECT_TIMEOUT;
358                 rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL);
359                 xprt->ops->connect(task);
360         }
361         return;
362 }
363
364 /*
365  * We arrive here when awoken from waiting on connection establishment.
366  */
367 static void
368 xprt_connect_status(struct rpc_task *task)
369 {
370         struct rpc_xprt *xprt = task->tk_xprt;
371
372         if (task->tk_status >= 0) {
373                 dprintk("RPC: %4d xprt_connect_status: connection established\n",
374                                 task->tk_pid);
375                 return;
376         }
377
378         switch (task->tk_status) {
379         case -ECONNREFUSED:
380         case -ECONNRESET:
381                 dprintk("RPC: %4d xprt_connect_status: server %s refused connection\n",
382                                 task->tk_pid, task->tk_client->cl_server);
383                 break;
384         case -ENOTCONN:
385                 dprintk("RPC: %4d xprt_connect_status: connection broken\n",
386                                 task->tk_pid);
387                 break;
388         case -ETIMEDOUT:
389                 dprintk("RPC: %4d xprt_connect_status: connect attempt timed out\n",
390                                 task->tk_pid);
391                 break;
392         default:
393                 dprintk("RPC: %4d xprt_connect_status: error %d connecting to server %s\n",
394                                 task->tk_pid, -task->tk_status, task->tk_client->cl_server);
395                 xprt_release_write(xprt, task);
396                 task->tk_status = -EIO;
397                 return;
398         }
399
400         /* if soft mounted, just cause this RPC to fail */
401         if (RPC_IS_SOFT(task)) {
402                 xprt_release_write(xprt, task);
403                 task->tk_status = -EIO;
404         }
405 }
406
407 /*
408  * Look up the RPC request corresponding to a reply, and then lock it.
409  */
410 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
411 {
412         struct list_head *pos;
413         struct rpc_rqst *req = NULL;
414
415         list_for_each(pos, &xprt->recv) {
416                 struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list);
417                 if (entry->rq_xid == xid) {
418                         req = entry;
419                         break;
420                 }
421         }
422         return req;
423 }
424
425 /*
426  * Complete reply received.
427  * The TCP code relies on us to remove the request from xprt->pending.
428  */
429 void xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
430 {
431         struct rpc_task *task = req->rq_task;
432         struct rpc_clnt *clnt = task->tk_client;
433
434         /* Adjust congestion window */
435         if (!xprt->nocong) {
436                 unsigned timer = task->tk_msg.rpc_proc->p_timer;
437                 xprt_adjust_cwnd(xprt, copied);
438                 __xprt_put_cong(xprt, req);
439                 if (timer) {
440                         if (req->rq_ntrans == 1)
441                                 rpc_update_rtt(clnt->cl_rtt, timer,
442                                                 (long)jiffies - req->rq_xtime);
443                         rpc_set_timeo(clnt->cl_rtt, timer, req->rq_ntrans - 1);
444                 }
445         }
446
447 #ifdef RPC_PROFILE
448         /* Profile only reads for now */
449         if (copied > 1024) {
450                 static unsigned long    nextstat;
451                 static unsigned long    pkt_rtt, pkt_len, pkt_cnt;
452
453                 pkt_cnt++;
454                 pkt_len += req->rq_slen + copied;
455                 pkt_rtt += jiffies - req->rq_xtime;
456                 if (time_before(nextstat, jiffies)) {
457                         printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd);
458                         printk("RPC: %ld %ld %ld %ld stat\n",
459                                         jiffies, pkt_cnt, pkt_len, pkt_rtt);
460                         pkt_rtt = pkt_len = pkt_cnt = 0;
461                         nextstat = jiffies + 5 * HZ;
462                 }
463         }
464 #endif
465
466         dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
467         list_del_init(&req->rq_list);
468         req->rq_received = req->rq_private_buf.len = copied;
469
470         /* ... and wake up the process. */
471         rpc_wake_up_task(task);
472         return;
473 }
474
475 /*
476  * RPC receive timeout handler.
477  */
478 static void
479 xprt_timer(struct rpc_task *task)
480 {
481         struct rpc_rqst *req = task->tk_rqstp;
482         struct rpc_xprt *xprt = req->rq_xprt;
483
484         spin_lock(&xprt->sock_lock);
485         if (req->rq_received)
486                 goto out;
487
488         xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT);
489         __xprt_put_cong(xprt, req);
490
491         dprintk("RPC: %4d xprt_timer (%s request)\n",
492                 task->tk_pid, req ? "pending" : "backlogged");
493
494         task->tk_status  = -ETIMEDOUT;
495 out:
496         task->tk_timeout = 0;
497         rpc_wake_up_task(task);
498         spin_unlock(&xprt->sock_lock);
499 }
500
501 /*
502  * Place the actual RPC call.
503  * We have to copy the iovec because sendmsg fiddles with its contents.
504  */
505 int
506 xprt_prepare_transmit(struct rpc_task *task)
507 {
508         struct rpc_rqst *req = task->tk_rqstp;
509         struct rpc_xprt *xprt = req->rq_xprt;
510         int err = 0;
511
512         dprintk("RPC: %4d xprt_prepare_transmit\n", task->tk_pid);
513
514         if (xprt->shutdown)
515                 return -EIO;
516
517         spin_lock_bh(&xprt->sock_lock);
518         if (req->rq_received && !req->rq_bytes_sent) {
519                 err = req->rq_received;
520                 goto out_unlock;
521         }
522         if (!__xprt_lock_write(xprt, task)) {
523                 err = -EAGAIN;
524                 goto out_unlock;
525         }
526
527         if (!xprt_connected(xprt)) {
528                 err = -ENOTCONN;
529                 goto out_unlock;
530         }
531 out_unlock:
532         spin_unlock_bh(&xprt->sock_lock);
533         return err;
534 }
535
536 void
537 xprt_transmit(struct rpc_task *task)
538 {
539         struct rpc_clnt *clnt = task->tk_client;
540         struct rpc_rqst *req = task->tk_rqstp;
541         struct rpc_xprt *xprt = req->rq_xprt;
542         int status;
543
544         dprintk("RPC: %4d xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
545
546         smp_rmb();
547         if (!req->rq_received) {
548                 if (list_empty(&req->rq_list)) {
549                         spin_lock_bh(&xprt->sock_lock);
550                         /* Update the softirq receive buffer */
551                         memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
552                                         sizeof(req->rq_private_buf));
553                         /* Add request to the receive list */
554                         list_add_tail(&req->rq_list, &xprt->recv);
555                         spin_unlock_bh(&xprt->sock_lock);
556                         xprt_reset_majortimeo(req);
557                         /* Turn off autodisconnect */
558                         del_singleshot_timer_sync(&xprt->timer);
559                 }
560         } else if (!req->rq_bytes_sent)
561                 return;
562
563         status = xprt->ops->send_request(task);
564         if (!status)
565                 goto out_receive;
566
567         /* Note: at this point, task->tk_sleeping has not yet been set,
568          *       hence there is no danger of the waking up task being put on
569          *       schedq, and being picked up by a parallel run of rpciod().
570          */
571         task->tk_status = status;
572
573         switch (status) {
574         case -ECONNREFUSED:
575                 task->tk_timeout = RPC_REESTABLISH_TIMEOUT;
576                 rpc_sleep_on(&xprt->sending, task, NULL, NULL);
577         case -EAGAIN:
578         case -ENOTCONN:
579                 return;
580         default:
581                 if (xprt->stream)
582                         xprt_disconnect(xprt);
583         }
584         xprt_release_write(xprt, task);
585         return;
586  out_receive:
587         dprintk("RPC: %4d xmit complete\n", task->tk_pid);
588         /* Set the task's receive timeout value */
589         spin_lock_bh(&xprt->sock_lock);
590         if (!xprt->nocong) {
591                 int timer = task->tk_msg.rpc_proc->p_timer;
592                 task->tk_timeout = rpc_calc_rto(clnt->cl_rtt, timer);
593                 task->tk_timeout <<= rpc_ntimeo(clnt->cl_rtt, timer) + req->rq_retries;
594                 if (task->tk_timeout > xprt->timeout.to_maxval || task->tk_timeout == 0)
595                         task->tk_timeout = xprt->timeout.to_maxval;
596         } else
597                 task->tk_timeout = req->rq_timeout;
598         /* Don't race with disconnect */
599         if (!xprt_connected(xprt))
600                 task->tk_status = -ENOTCONN;
601         else if (!req->rq_received)
602                 rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer);
603         __xprt_release_write(xprt, task);
604         spin_unlock_bh(&xprt->sock_lock);
605 }
606
607 /*
608  * Reserve an RPC call slot.
609  */
610 static inline void
611 do_xprt_reserve(struct rpc_task *task)
612 {
613         struct rpc_xprt *xprt = task->tk_xprt;
614
615         task->tk_status = 0;
616         if (task->tk_rqstp)
617                 return;
618         if (!list_empty(&xprt->free)) {
619                 struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
620                 list_del_init(&req->rq_list);
621                 task->tk_rqstp = req;
622                 xprt_request_init(task, xprt);
623                 return;
624         }
625         dprintk("RPC:      waiting for request slot\n");
626         task->tk_status = -EAGAIN;
627         task->tk_timeout = 0;
628         rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
629 }
630
631 void
632 xprt_reserve(struct rpc_task *task)
633 {
634         struct rpc_xprt *xprt = task->tk_xprt;
635
636         task->tk_status = -EIO;
637         if (!xprt->shutdown) {
638                 spin_lock(&xprt->xprt_lock);
639                 do_xprt_reserve(task);
640                 spin_unlock(&xprt->xprt_lock);
641         }
642 }
643
644 /*
645  * Allocate a 'unique' XID
646  */
647 static inline u32 xprt_alloc_xid(struct rpc_xprt *xprt)
648 {
649         return xprt->xid++;
650 }
651
652 static inline void xprt_init_xid(struct rpc_xprt *xprt)
653 {
654         get_random_bytes(&xprt->xid, sizeof(xprt->xid));
655 }
656
657 /*
658  * Initialize RPC request
659  */
660 static void
661 xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
662 {
663         struct rpc_rqst *req = task->tk_rqstp;
664
665         req->rq_timeout = xprt->timeout.to_initval;
666         req->rq_task    = task;
667         req->rq_xprt    = xprt;
668         req->rq_xid     = xprt_alloc_xid(xprt);
669         dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid,
670                         req, ntohl(req->rq_xid));
671 }
672
673 /*
674  * Release an RPC call slot
675  */
676 void
677 xprt_release(struct rpc_task *task)
678 {
679         struct rpc_xprt *xprt = task->tk_xprt;
680         struct rpc_rqst *req;
681
682         if (!(req = task->tk_rqstp))
683                 return;
684         spin_lock_bh(&xprt->sock_lock);
685         __xprt_release_write(xprt, task);
686         __xprt_put_cong(xprt, req);
687         if (!list_empty(&req->rq_list))
688                 list_del(&req->rq_list);
689         xprt->last_used = jiffies;
690         if (list_empty(&xprt->recv) && !xprt->shutdown)
691                 mod_timer(&xprt->timer,
692                                 xprt->last_used + RPC_IDLE_DISCONNECT_TIMEOUT);
693         spin_unlock_bh(&xprt->sock_lock);
694         task->tk_rqstp = NULL;
695         memset(req, 0, sizeof(*req));   /* mark unused */
696
697         dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
698
699         spin_lock(&xprt->xprt_lock);
700         list_add(&req->rq_list, &xprt->free);
701         xprt_clear_backlog(xprt);
702         spin_unlock(&xprt->xprt_lock);
703 }
704
705 /*
706  * Set constant timeout
707  */
708 void
709 xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
710 {
711         to->to_initval   = 
712         to->to_increment = incr;
713         to->to_maxval    = to->to_initval + (incr * retr);
714         to->to_retries   = retr;
715         to->to_exponential = 0;
716 }
717
718 /*
719  * Initialize an RPC client
720  */
721 static struct rpc_xprt *
722 xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to)
723 {
724         int result;
725         struct rpc_xprt *xprt;
726         struct rpc_rqst *req;
727
728         if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
729                 return ERR_PTR(-ENOMEM);
730         memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */
731
732         xprt->addr = *ap;
733
734         switch (proto) {
735         case IPPROTO_UDP:
736                 result = xs_setup_udp(xprt, to);
737                 break;
738         case IPPROTO_TCP:
739                 result = xs_setup_tcp(xprt, to);
740                 break;
741         default:
742                 printk(KERN_ERR "RPC: unrecognized transport protocol: %d\n",
743                                 proto);
744                 result = -EIO;
745                 break;
746         }
747         if (result) {
748                 kfree(xprt);
749                 return ERR_PTR(result);
750         }
751
752         spin_lock_init(&xprt->sock_lock);
753         spin_lock_init(&xprt->xprt_lock);
754         init_waitqueue_head(&xprt->cong_wait);
755
756         INIT_LIST_HEAD(&xprt->free);
757         INIT_LIST_HEAD(&xprt->recv);
758         INIT_WORK(&xprt->task_cleanup, xprt_socket_autoclose, xprt);
759         init_timer(&xprt->timer);
760         xprt->timer.function = xprt_init_autodisconnect;
761         xprt->timer.data = (unsigned long) xprt;
762         xprt->last_used = jiffies;
763
764         rpc_init_wait_queue(&xprt->pending, "xprt_pending");
765         rpc_init_wait_queue(&xprt->sending, "xprt_sending");
766         rpc_init_wait_queue(&xprt->resend, "xprt_resend");
767         rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
768
769         /* initialize free list */
770         for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--)
771                 list_add(&req->rq_list, &xprt->free);
772
773         xprt_init_xid(xprt);
774
775         dprintk("RPC:      created transport %p with %u slots\n", xprt,
776                         xprt->max_reqs);
777         
778         return xprt;
779 }
780
781 /*
782  * Create an RPC client transport given the protocol and peer address.
783  */
784 struct rpc_xprt *
785 xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
786 {
787         struct rpc_xprt *xprt;
788
789         xprt = xprt_setup(proto, sap, to);
790         if (IS_ERR(xprt))
791                 dprintk("RPC:      xprt_create_proto failed\n");
792         else
793                 dprintk("RPC:      xprt_create_proto created xprt %p\n", xprt);
794         return xprt;
795 }
796
797 /*
798  * Prepare for transport shutdown.
799  */
800 static void
801 xprt_shutdown(struct rpc_xprt *xprt)
802 {
803         xprt->shutdown = 1;
804         rpc_wake_up(&xprt->sending);
805         rpc_wake_up(&xprt->resend);
806         rpc_wake_up(&xprt->pending);
807         rpc_wake_up(&xprt->backlog);
808         wake_up(&xprt->cong_wait);
809         del_timer_sync(&xprt->timer);
810 }
811
812 /*
813  * Clear the xprt backlog queue
814  */
815 static int
816 xprt_clear_backlog(struct rpc_xprt *xprt) {
817         rpc_wake_up_next(&xprt->backlog);
818         wake_up(&xprt->cong_wait);
819         return 1;
820 }
821
822 /*
823  * Destroy an RPC transport, killing off all requests.
824  */
825 int
826 xprt_destroy(struct rpc_xprt *xprt)
827 {
828         dprintk("RPC:      destroying transport %p\n", xprt);
829         xprt_shutdown(xprt);
830         xprt->ops->destroy(xprt);
831         kfree(xprt);
832
833         return 0;
834 }