]> err.no Git - linux-2.6/blob - net/sunrpc/clnt.c
[PATCH] RPC: Allow multiple RPC client programs to share the same transport
[linux-2.6] / net / sunrpc / clnt.c
1 /*
2  *  linux/net/sunrpc/rpcclnt.c
3  *
4  *  This file contains the high-level RPC interface.
5  *  It is modeled as a finite state machine to support both synchronous
6  *  and asynchronous requests.
7  *
8  *  -   RPC header generation and argument serialization.
9  *  -   Credential refresh.
10  *  -   TCP connect handling.
11  *  -   Retry of operation when it is suspected the operation failed because
12  *      of uid squashing on the server, or when the credentials were stale
13  *      and need to be refreshed, or when a packet was damaged in transit.
14  *      This may be have to be moved to the VFS layer.
15  *
16  *  NB: BSD uses a more intelligent approach to guessing when a request
17  *  or reply has been lost by keeping the RTO estimate for each procedure.
18  *  We currently make do with a constant timeout value.
19  *
20  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
21  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
22  */
23
24 #include <asm/system.h>
25
26 #include <linux/module.h>
27 #include <linux/types.h>
28 #include <linux/mm.h>
29 #include <linux/slab.h>
30 #include <linux/in.h>
31 #include <linux/utsname.h>
32
33 #include <linux/sunrpc/clnt.h>
34 #include <linux/workqueue.h>
35 #include <linux/sunrpc/rpc_pipe_fs.h>
36
37 #include <linux/nfs.h>
38
39
40 #define RPC_SLACK_SPACE         (1024)  /* total overkill */
41
42 #ifdef RPC_DEBUG
43 # define RPCDBG_FACILITY        RPCDBG_CALL
44 #endif
45
46 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
47
48
49 static void     call_start(struct rpc_task *task);
50 static void     call_reserve(struct rpc_task *task);
51 static void     call_reserveresult(struct rpc_task *task);
52 static void     call_allocate(struct rpc_task *task);
53 static void     call_encode(struct rpc_task *task);
54 static void     call_decode(struct rpc_task *task);
55 static void     call_bind(struct rpc_task *task);
56 static void     call_transmit(struct rpc_task *task);
57 static void     call_status(struct rpc_task *task);
58 static void     call_refresh(struct rpc_task *task);
59 static void     call_refreshresult(struct rpc_task *task);
60 static void     call_timeout(struct rpc_task *task);
61 static void     call_connect(struct rpc_task *task);
62 static void     call_connect_status(struct rpc_task *task);
63 static u32 *    call_header(struct rpc_task *task);
64 static u32 *    call_verify(struct rpc_task *task);
65
66
67 static int
68 rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
69 {
70         static uint32_t clntid;
71         int error;
72
73         if (dir_name == NULL)
74                 return 0;
75         for (;;) {
76                 snprintf(clnt->cl_pathname, sizeof(clnt->cl_pathname),
77                                 "%s/clnt%x", dir_name,
78                                 (unsigned int)clntid++);
79                 clnt->cl_pathname[sizeof(clnt->cl_pathname) - 1] = '\0';
80                 clnt->cl_dentry = rpc_mkdir(clnt->cl_pathname, clnt);
81                 if (!IS_ERR(clnt->cl_dentry))
82                         return 0;
83                 error = PTR_ERR(clnt->cl_dentry);
84                 if (error != -EEXIST) {
85                         printk(KERN_INFO "RPC: Couldn't create pipefs entry %s, error %d\n",
86                                         clnt->cl_pathname, error);
87                         return error;
88                 }
89         }
90 }
91
92 /*
93  * Create an RPC client
94  * FIXME: This should also take a flags argument (as in task->tk_flags).
95  * It's called (among others) from pmap_create_client, which may in
96  * turn be called by an async task. In this case, rpciod should not be
97  * made to sleep too long.
98  */
99 struct rpc_clnt *
100 rpc_new_client(struct rpc_xprt *xprt, char *servname,
101                   struct rpc_program *program, u32 vers,
102                   rpc_authflavor_t flavor)
103 {
104         struct rpc_version      *version;
105         struct rpc_clnt         *clnt = NULL;
106         struct rpc_auth         *auth;
107         int err;
108         int len;
109
110         dprintk("RPC: creating %s client for %s (xprt %p)\n",
111                 program->name, servname, xprt);
112
113         err = -EINVAL;
114         if (!xprt)
115                 goto out_err;
116         if (vers >= program->nrvers || !(version = program->version[vers]))
117                 goto out_err;
118
119         err = -ENOMEM;
120         clnt = (struct rpc_clnt *) kmalloc(sizeof(*clnt), GFP_KERNEL);
121         if (!clnt)
122                 goto out_err;
123         memset(clnt, 0, sizeof(*clnt));
124         atomic_set(&clnt->cl_users, 0);
125         atomic_set(&clnt->cl_count, 1);
126         clnt->cl_parent = clnt;
127
128         clnt->cl_server = clnt->cl_inline_name;
129         len = strlen(servname) + 1;
130         if (len > sizeof(clnt->cl_inline_name)) {
131                 char *buf = kmalloc(len, GFP_KERNEL);
132                 if (buf != 0)
133                         clnt->cl_server = buf;
134                 else
135                         len = sizeof(clnt->cl_inline_name);
136         }
137         strlcpy(clnt->cl_server, servname, len);
138
139         clnt->cl_xprt     = xprt;
140         clnt->cl_procinfo = version->procs;
141         clnt->cl_maxproc  = version->nrprocs;
142         clnt->cl_protname = program->name;
143         clnt->cl_pmap     = &clnt->cl_pmap_default;
144         clnt->cl_port     = xprt->addr.sin_port;
145         clnt->cl_prog     = program->number;
146         clnt->cl_vers     = version->number;
147         clnt->cl_prot     = xprt->prot;
148         clnt->cl_stats    = program->stats;
149         rpc_init_wait_queue(&clnt->cl_pmap_default.pm_bindwait, "bindwait");
150
151         if (!clnt->cl_port)
152                 clnt->cl_autobind = 1;
153
154         clnt->cl_rtt = &clnt->cl_rtt_default;
155         rpc_init_rtt(&clnt->cl_rtt_default, xprt->timeout.to_initval);
156
157         err = rpc_setup_pipedir(clnt, program->pipe_dir_name);
158         if (err < 0)
159                 goto out_no_path;
160
161         auth = rpcauth_create(flavor, clnt);
162         if (IS_ERR(auth)) {
163                 printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %u)\n",
164                                 flavor);
165                 err = PTR_ERR(auth);
166                 goto out_no_auth;
167         }
168
169         /* save the nodename */
170         clnt->cl_nodelen = strlen(system_utsname.nodename);
171         if (clnt->cl_nodelen > UNX_MAXNODENAME)
172                 clnt->cl_nodelen = UNX_MAXNODENAME;
173         memcpy(clnt->cl_nodename, system_utsname.nodename, clnt->cl_nodelen);
174         return clnt;
175
176 out_no_auth:
177         rpc_rmdir(clnt->cl_pathname);
178 out_no_path:
179         if (clnt->cl_server != clnt->cl_inline_name)
180                 kfree(clnt->cl_server);
181         kfree(clnt);
182 out_err:
183         xprt_destroy(xprt);
184         return ERR_PTR(err);
185 }
186
187 /**
188  * Create an RPC client
189  * @xprt - pointer to xprt struct
190  * @servname - name of server
191  * @info - rpc_program
192  * @version - rpc_program version
193  * @authflavor - rpc_auth flavour to use
194  *
195  * Creates an RPC client structure, then pings the server in order to
196  * determine if it is up, and if it supports this program and version.
197  *
198  * This function should never be called by asynchronous tasks such as
199  * the portmapper.
200  */
201 struct rpc_clnt *rpc_create_client(struct rpc_xprt *xprt, char *servname,
202                 struct rpc_program *info, u32 version, rpc_authflavor_t authflavor)
203 {
204         struct rpc_clnt *clnt;
205         int err;
206         
207         clnt = rpc_new_client(xprt, servname, info, version, authflavor);
208         if (IS_ERR(clnt))
209                 return clnt;
210         err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
211         if (err == 0)
212                 return clnt;
213         rpc_shutdown_client(clnt);
214         return ERR_PTR(err);
215 }
216
217 /*
218  * This function clones the RPC client structure. It allows us to share the
219  * same transport while varying parameters such as the authentication
220  * flavour.
221  */
222 struct rpc_clnt *
223 rpc_clone_client(struct rpc_clnt *clnt)
224 {
225         struct rpc_clnt *new;
226
227         new = (struct rpc_clnt *)kmalloc(sizeof(*new), GFP_KERNEL);
228         if (!new)
229                 goto out_no_clnt;
230         memcpy(new, clnt, sizeof(*new));
231         atomic_set(&new->cl_count, 1);
232         atomic_set(&new->cl_users, 0);
233         new->cl_parent = clnt;
234         atomic_inc(&clnt->cl_count);
235         /* Duplicate portmapper */
236         rpc_init_wait_queue(&new->cl_pmap_default.pm_bindwait, "bindwait");
237         /* Turn off autobind on clones */
238         new->cl_autobind = 0;
239         new->cl_oneshot = 0;
240         new->cl_dead = 0;
241         rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval);
242         if (new->cl_auth)
243                 atomic_inc(&new->cl_auth->au_count);
244         new->cl_pmap            = &new->cl_pmap_default;
245         rpc_init_wait_queue(&new->cl_pmap_default.pm_bindwait, "bindwait");
246         return new;
247 out_no_clnt:
248         printk(KERN_INFO "RPC: out of memory in %s\n", __FUNCTION__);
249         return ERR_PTR(-ENOMEM);
250 }
251
252 /*
253  * Properly shut down an RPC client, terminating all outstanding
254  * requests. Note that we must be certain that cl_oneshot and
255  * cl_dead are cleared, or else the client would be destroyed
256  * when the last task releases it.
257  */
258 int
259 rpc_shutdown_client(struct rpc_clnt *clnt)
260 {
261         dprintk("RPC: shutting down %s client for %s, tasks=%d\n",
262                         clnt->cl_protname, clnt->cl_server,
263                         atomic_read(&clnt->cl_users));
264
265         while (atomic_read(&clnt->cl_users) > 0) {
266                 /* Don't let rpc_release_client destroy us */
267                 clnt->cl_oneshot = 0;
268                 clnt->cl_dead = 0;
269                 rpc_killall_tasks(clnt);
270                 sleep_on_timeout(&destroy_wait, 1*HZ);
271         }
272
273         if (atomic_read(&clnt->cl_users) < 0) {
274                 printk(KERN_ERR "RPC: rpc_shutdown_client clnt %p tasks=%d\n",
275                                 clnt, atomic_read(&clnt->cl_users));
276 #ifdef RPC_DEBUG
277                 rpc_show_tasks();
278 #endif
279                 BUG();
280         }
281
282         return rpc_destroy_client(clnt);
283 }
284
285 /*
286  * Delete an RPC client
287  */
288 int
289 rpc_destroy_client(struct rpc_clnt *clnt)
290 {
291         if (!atomic_dec_and_test(&clnt->cl_count))
292                 return 1;
293         BUG_ON(atomic_read(&clnt->cl_users) != 0);
294
295         dprintk("RPC: destroying %s client for %s\n",
296                         clnt->cl_protname, clnt->cl_server);
297         if (clnt->cl_auth) {
298                 rpcauth_destroy(clnt->cl_auth);
299                 clnt->cl_auth = NULL;
300         }
301         if (clnt->cl_parent != clnt) {
302                 rpc_destroy_client(clnt->cl_parent);
303                 goto out_free;
304         }
305         if (clnt->cl_pathname[0])
306                 rpc_rmdir(clnt->cl_pathname);
307         if (clnt->cl_xprt) {
308                 xprt_destroy(clnt->cl_xprt);
309                 clnt->cl_xprt = NULL;
310         }
311         if (clnt->cl_server != clnt->cl_inline_name)
312                 kfree(clnt->cl_server);
313 out_free:
314         kfree(clnt);
315         return 0;
316 }
317
318 /*
319  * Release an RPC client
320  */
321 void
322 rpc_release_client(struct rpc_clnt *clnt)
323 {
324         dprintk("RPC:      rpc_release_client(%p, %d)\n",
325                                 clnt, atomic_read(&clnt->cl_users));
326
327         if (!atomic_dec_and_test(&clnt->cl_users))
328                 return;
329         wake_up(&destroy_wait);
330         if (clnt->cl_oneshot || clnt->cl_dead)
331                 rpc_destroy_client(clnt);
332 }
333
334 /**
335  * rpc_bind_new_program - bind a new RPC program to an existing client
336  * @old - old rpc_client
337  * @program - rpc program to set
338  * @vers - rpc program version
339  *
340  * Clones the rpc client and sets up a new RPC program. This is mainly
341  * of use for enabling different RPC programs to share the same transport.
342  * The Sun NFSv2/v3 ACL protocol can do this.
343  */
344 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
345                                       struct rpc_program *program,
346                                       int vers)
347 {
348         struct rpc_clnt *clnt;
349         struct rpc_version *version;
350         int err;
351
352         BUG_ON(vers >= program->nrvers || !program->version[vers]);
353         version = program->version[vers];
354         clnt = rpc_clone_client(old);
355         if (IS_ERR(clnt))
356                 goto out;
357         clnt->cl_procinfo = version->procs;
358         clnt->cl_maxproc  = version->nrprocs;
359         clnt->cl_protname = program->name;
360         clnt->cl_prog     = program->number;
361         clnt->cl_vers     = version->number;
362         clnt->cl_stats    = program->stats;
363         err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
364         if (err != 0) {
365                 rpc_shutdown_client(clnt);
366                 clnt = ERR_PTR(err);
367         }
368 out:    
369         return clnt;
370 }
371
372 /*
373  * Default callback for async RPC calls
374  */
375 static void
376 rpc_default_callback(struct rpc_task *task)
377 {
378 }
379
380 /*
381  *      Export the signal mask handling for aysnchronous code that
382  *      sleeps on RPC calls
383  */
384  
385 void rpc_clnt_sigmask(struct rpc_clnt *clnt, sigset_t *oldset)
386 {
387         unsigned long   sigallow = sigmask(SIGKILL);
388         unsigned long   irqflags;
389         
390         /* Turn off various signals */
391         if (clnt->cl_intr) {
392                 struct k_sigaction *action = current->sighand->action;
393                 if (action[SIGINT-1].sa.sa_handler == SIG_DFL)
394                         sigallow |= sigmask(SIGINT);
395                 if (action[SIGQUIT-1].sa.sa_handler == SIG_DFL)
396                         sigallow |= sigmask(SIGQUIT);
397         }
398         spin_lock_irqsave(&current->sighand->siglock, irqflags);
399         *oldset = current->blocked;
400         siginitsetinv(&current->blocked, sigallow & ~oldset->sig[0]);
401         recalc_sigpending();
402         spin_unlock_irqrestore(&current->sighand->siglock, irqflags);
403 }
404
405 void rpc_clnt_sigunmask(struct rpc_clnt *clnt, sigset_t *oldset)
406 {
407         unsigned long   irqflags;
408         
409         spin_lock_irqsave(&current->sighand->siglock, irqflags);
410         current->blocked = *oldset;
411         recalc_sigpending();
412         spin_unlock_irqrestore(&current->sighand->siglock, irqflags);
413 }
414
415 /*
416  * New rpc_call implementation
417  */
418 int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags)
419 {
420         struct rpc_task *task;
421         sigset_t        oldset;
422         int             status;
423
424         /* If this client is slain all further I/O fails */
425         if (clnt->cl_dead) 
426                 return -EIO;
427
428         BUG_ON(flags & RPC_TASK_ASYNC);
429
430         rpc_clnt_sigmask(clnt, &oldset);                
431
432         status = -ENOMEM;
433         task = rpc_new_task(clnt, NULL, flags);
434         if (task == NULL)
435                 goto out;
436
437         rpc_call_setup(task, msg, 0);
438
439         /* Set up the call info struct and execute the task */
440         if (task->tk_status == 0)
441                 status = rpc_execute(task);
442         else {
443                 status = task->tk_status;
444                 rpc_release_task(task);
445         }
446
447 out:
448         rpc_clnt_sigunmask(clnt, &oldset);              
449
450         return status;
451 }
452
453 /*
454  * New rpc_call implementation
455  */
456 int
457 rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags,
458                rpc_action callback, void *data)
459 {
460         struct rpc_task *task;
461         sigset_t        oldset;
462         int             status;
463
464         /* If this client is slain all further I/O fails */
465         if (clnt->cl_dead) 
466                 return -EIO;
467
468         flags |= RPC_TASK_ASYNC;
469
470         rpc_clnt_sigmask(clnt, &oldset);                
471
472         /* Create/initialize a new RPC task */
473         if (!callback)
474                 callback = rpc_default_callback;
475         status = -ENOMEM;
476         if (!(task = rpc_new_task(clnt, callback, flags)))
477                 goto out;
478         task->tk_calldata = data;
479
480         rpc_call_setup(task, msg, 0);
481
482         /* Set up the call info struct and execute the task */
483         status = task->tk_status;
484         if (status == 0)
485                 rpc_execute(task);
486         else
487                 rpc_release_task(task);
488
489 out:
490         rpc_clnt_sigunmask(clnt, &oldset);              
491
492         return status;
493 }
494
495
496 void
497 rpc_call_setup(struct rpc_task *task, struct rpc_message *msg, int flags)
498 {
499         task->tk_msg   = *msg;
500         task->tk_flags |= flags;
501         /* Bind the user cred */
502         if (task->tk_msg.rpc_cred != NULL)
503                 rpcauth_holdcred(task);
504         else
505                 rpcauth_bindcred(task);
506
507         if (task->tk_status == 0)
508                 task->tk_action = call_start;
509         else
510                 task->tk_action = NULL;
511 }
512
513 void
514 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
515 {
516         struct rpc_xprt *xprt = clnt->cl_xprt;
517
518         xprt->sndsize = 0;
519         if (sndsize)
520                 xprt->sndsize = sndsize + RPC_SLACK_SPACE;
521         xprt->rcvsize = 0;
522         if (rcvsize)
523                 xprt->rcvsize = rcvsize + RPC_SLACK_SPACE;
524         if (xprt_connected(xprt))
525                 xprt_sock_setbufsize(xprt);
526 }
527
528 /*
529  * Return size of largest payload RPC client can support, in bytes
530  *
531  * For stream transports, this is one RPC record fragment (see RFC
532  * 1831), as we don't support multi-record requests yet.  For datagram
533  * transports, this is the size of an IP packet minus the IP, UDP, and
534  * RPC header sizes.
535  */
536 size_t rpc_max_payload(struct rpc_clnt *clnt)
537 {
538         return clnt->cl_xprt->max_payload;
539 }
540 EXPORT_SYMBOL(rpc_max_payload);
541
542 /*
543  * Restart an (async) RPC call. Usually called from within the
544  * exit handler.
545  */
546 void
547 rpc_restart_call(struct rpc_task *task)
548 {
549         if (RPC_ASSASSINATED(task))
550                 return;
551
552         task->tk_action = call_start;
553 }
554
555 /*
556  * 0.  Initial state
557  *
558  *     Other FSM states can be visited zero or more times, but
559  *     this state is visited exactly once for each RPC.
560  */
561 static void
562 call_start(struct rpc_task *task)
563 {
564         struct rpc_clnt *clnt = task->tk_client;
565
566         dprintk("RPC: %4d call_start %s%d proc %d (%s)\n", task->tk_pid,
567                 clnt->cl_protname, clnt->cl_vers, task->tk_msg.rpc_proc->p_proc,
568                 (RPC_IS_ASYNC(task) ? "async" : "sync"));
569
570         /* Increment call count */
571         task->tk_msg.rpc_proc->p_count++;
572         clnt->cl_stats->rpccnt++;
573         task->tk_action = call_reserve;
574 }
575
576 /*
577  * 1.   Reserve an RPC call slot
578  */
579 static void
580 call_reserve(struct rpc_task *task)
581 {
582         dprintk("RPC: %4d call_reserve\n", task->tk_pid);
583
584         if (!rpcauth_uptodatecred(task)) {
585                 task->tk_action = call_refresh;
586                 return;
587         }
588
589         task->tk_status  = 0;
590         task->tk_action  = call_reserveresult;
591         xprt_reserve(task);
592 }
593
594 /*
595  * 1b.  Grok the result of xprt_reserve()
596  */
597 static void
598 call_reserveresult(struct rpc_task *task)
599 {
600         int status = task->tk_status;
601
602         dprintk("RPC: %4d call_reserveresult (status %d)\n",
603                                 task->tk_pid, task->tk_status);
604
605         /*
606          * After a call to xprt_reserve(), we must have either
607          * a request slot or else an error status.
608          */
609         task->tk_status = 0;
610         if (status >= 0) {
611                 if (task->tk_rqstp) {
612                         task->tk_action = call_allocate;
613                         return;
614                 }
615
616                 printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
617                                 __FUNCTION__, status);
618                 rpc_exit(task, -EIO);
619                 return;
620         }
621
622         /*
623          * Even though there was an error, we may have acquired
624          * a request slot somehow.  Make sure not to leak it.
625          */
626         if (task->tk_rqstp) {
627                 printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
628                                 __FUNCTION__, status);
629                 xprt_release(task);
630         }
631
632         switch (status) {
633         case -EAGAIN:   /* woken up; retry */
634                 task->tk_action = call_reserve;
635                 return;
636         case -EIO:      /* probably a shutdown */
637                 break;
638         default:
639                 printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
640                                 __FUNCTION__, status);
641                 break;
642         }
643         rpc_exit(task, status);
644 }
645
646 /*
647  * 2.   Allocate the buffer. For details, see sched.c:rpc_malloc.
648  *      (Note: buffer memory is freed in rpc_task_release).
649  */
650 static void
651 call_allocate(struct rpc_task *task)
652 {
653         unsigned int    bufsiz;
654
655         dprintk("RPC: %4d call_allocate (status %d)\n", 
656                                 task->tk_pid, task->tk_status);
657         task->tk_action = call_bind;
658         if (task->tk_buffer)
659                 return;
660
661         /* FIXME: compute buffer requirements more exactly using
662          * auth->au_wslack */
663         bufsiz = task->tk_msg.rpc_proc->p_bufsiz + RPC_SLACK_SPACE;
664
665         if (rpc_malloc(task, bufsiz << 1) != NULL)
666                 return;
667         printk(KERN_INFO "RPC: buffer allocation failed for task %p\n", task); 
668
669         if (RPC_IS_ASYNC(task) || !(task->tk_client->cl_intr && signalled())) {
670                 xprt_release(task);
671                 task->tk_action = call_reserve;
672                 rpc_delay(task, HZ>>4);
673                 return;
674         }
675
676         rpc_exit(task, -ERESTARTSYS);
677 }
678
679 /*
680  * 3.   Encode arguments of an RPC call
681  */
682 static void
683 call_encode(struct rpc_task *task)
684 {
685         struct rpc_clnt *clnt = task->tk_client;
686         struct rpc_rqst *req = task->tk_rqstp;
687         struct xdr_buf *sndbuf = &req->rq_snd_buf;
688         struct xdr_buf *rcvbuf = &req->rq_rcv_buf;
689         unsigned int    bufsiz;
690         kxdrproc_t      encode;
691         int             status;
692         u32             *p;
693
694         dprintk("RPC: %4d call_encode (status %d)\n", 
695                                 task->tk_pid, task->tk_status);
696
697         /* Default buffer setup */
698         bufsiz = task->tk_bufsize >> 1;
699         sndbuf->head[0].iov_base = (void *)task->tk_buffer;
700         sndbuf->head[0].iov_len  = bufsiz;
701         sndbuf->tail[0].iov_len  = 0;
702         sndbuf->page_len         = 0;
703         sndbuf->len              = 0;
704         sndbuf->buflen           = bufsiz;
705         rcvbuf->head[0].iov_base = (void *)((char *)task->tk_buffer + bufsiz);
706         rcvbuf->head[0].iov_len  = bufsiz;
707         rcvbuf->tail[0].iov_len  = 0;
708         rcvbuf->page_len         = 0;
709         rcvbuf->len              = 0;
710         rcvbuf->buflen           = bufsiz;
711
712         /* Encode header and provided arguments */
713         encode = task->tk_msg.rpc_proc->p_encode;
714         if (!(p = call_header(task))) {
715                 printk(KERN_INFO "RPC: call_header failed, exit EIO\n");
716                 rpc_exit(task, -EIO);
717                 return;
718         }
719         if (encode && (status = rpcauth_wrap_req(task, encode, req, p,
720                                                  task->tk_msg.rpc_argp)) < 0) {
721                 printk(KERN_WARNING "%s: can't encode arguments: %d\n",
722                                 clnt->cl_protname, -status);
723                 rpc_exit(task, status);
724         }
725 }
726
727 /*
728  * 4.   Get the server port number if not yet set
729  */
730 static void
731 call_bind(struct rpc_task *task)
732 {
733         struct rpc_clnt *clnt = task->tk_client;
734         struct rpc_xprt *xprt = clnt->cl_xprt;
735
736         dprintk("RPC: %4d call_bind xprt %p %s connected\n", task->tk_pid,
737                         xprt, (xprt_connected(xprt) ? "is" : "is not"));
738
739         task->tk_action = (xprt_connected(xprt)) ? call_transmit : call_connect;
740
741         if (!clnt->cl_port) {
742                 task->tk_action = call_connect;
743                 task->tk_timeout = RPC_CONNECT_TIMEOUT;
744                 rpc_getport(task, clnt);
745         }
746 }
747
748 /*
749  * 4a.  Connect to the RPC server (TCP case)
750  */
751 static void
752 call_connect(struct rpc_task *task)
753 {
754         struct rpc_clnt *clnt = task->tk_client;
755
756         dprintk("RPC: %4d call_connect status %d\n",
757                                 task->tk_pid, task->tk_status);
758
759         if (xprt_connected(clnt->cl_xprt)) {
760                 task->tk_action = call_transmit;
761                 return;
762         }
763         task->tk_action = call_connect_status;
764         if (task->tk_status < 0)
765                 return;
766         xprt_connect(task);
767 }
768
769 /*
770  * 4b. Sort out connect result
771  */
772 static void
773 call_connect_status(struct rpc_task *task)
774 {
775         struct rpc_clnt *clnt = task->tk_client;
776         int status = task->tk_status;
777
778         task->tk_status = 0;
779         if (status >= 0) {
780                 clnt->cl_stats->netreconn++;
781                 task->tk_action = call_transmit;
782                 return;
783         }
784
785         /* Something failed: we may have to rebind */
786         if (clnt->cl_autobind)
787                 clnt->cl_port = 0;
788         switch (status) {
789         case -ENOTCONN:
790         case -ETIMEDOUT:
791         case -EAGAIN:
792                 task->tk_action = (clnt->cl_port == 0) ? call_bind : call_connect;
793                 break;
794         default:
795                 rpc_exit(task, -EIO);
796         }
797 }
798
799 /*
800  * 5.   Transmit the RPC request, and wait for reply
801  */
802 static void
803 call_transmit(struct rpc_task *task)
804 {
805         dprintk("RPC: %4d call_transmit (status %d)\n", 
806                                 task->tk_pid, task->tk_status);
807
808         task->tk_action = call_status;
809         if (task->tk_status < 0)
810                 return;
811         task->tk_status = xprt_prepare_transmit(task);
812         if (task->tk_status != 0)
813                 return;
814         /* Encode here so that rpcsec_gss can use correct sequence number. */
815         if (!task->tk_rqstp->rq_bytes_sent)
816                 call_encode(task);
817         if (task->tk_status < 0)
818                 return;
819         xprt_transmit(task);
820         if (task->tk_status < 0)
821                 return;
822         if (!task->tk_msg.rpc_proc->p_decode) {
823                 task->tk_action = NULL;
824                 rpc_wake_up_task(task);
825         }
826 }
827
828 /*
829  * 6.   Sort out the RPC call status
830  */
831 static void
832 call_status(struct rpc_task *task)
833 {
834         struct rpc_clnt *clnt = task->tk_client;
835         struct rpc_rqst *req = task->tk_rqstp;
836         int             status;
837
838         if (req->rq_received > 0 && !req->rq_bytes_sent)
839                 task->tk_status = req->rq_received;
840
841         dprintk("RPC: %4d call_status (status %d)\n", 
842                                 task->tk_pid, task->tk_status);
843
844         status = task->tk_status;
845         if (status >= 0) {
846                 task->tk_action = call_decode;
847                 return;
848         }
849
850         task->tk_status = 0;
851         switch(status) {
852         case -ETIMEDOUT:
853                 task->tk_action = call_timeout;
854                 break;
855         case -ECONNREFUSED:
856         case -ENOTCONN:
857                 req->rq_bytes_sent = 0;
858                 if (clnt->cl_autobind)
859                         clnt->cl_port = 0;
860                 task->tk_action = call_bind;
861                 break;
862         case -EAGAIN:
863                 task->tk_action = call_transmit;
864                 break;
865         case -EIO:
866                 /* shutdown or soft timeout */
867                 rpc_exit(task, status);
868                 break;
869         default:
870                 if (clnt->cl_chatty)
871                         printk("%s: RPC call returned error %d\n",
872                                clnt->cl_protname, -status);
873                 rpc_exit(task, status);
874                 break;
875         }
876 }
877
878 /*
879  * 6a.  Handle RPC timeout
880  *      We do not release the request slot, so we keep using the
881  *      same XID for all retransmits.
882  */
883 static void
884 call_timeout(struct rpc_task *task)
885 {
886         struct rpc_clnt *clnt = task->tk_client;
887
888         if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
889                 dprintk("RPC: %4d call_timeout (minor)\n", task->tk_pid);
890                 goto retry;
891         }
892
893         dprintk("RPC: %4d call_timeout (major)\n", task->tk_pid);
894         if (RPC_IS_SOFT(task)) {
895                 if (clnt->cl_chatty)
896                         printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
897                                 clnt->cl_protname, clnt->cl_server);
898                 rpc_exit(task, -EIO);
899                 return;
900         }
901
902         if (clnt->cl_chatty && !(task->tk_flags & RPC_CALL_MAJORSEEN)) {
903                 task->tk_flags |= RPC_CALL_MAJORSEEN;
904                 printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
905                         clnt->cl_protname, clnt->cl_server);
906         }
907         if (clnt->cl_autobind)
908                 clnt->cl_port = 0;
909
910 retry:
911         clnt->cl_stats->rpcretrans++;
912         task->tk_action = call_bind;
913         task->tk_status = 0;
914 }
915
916 /*
917  * 7.   Decode the RPC reply
918  */
919 static void
920 call_decode(struct rpc_task *task)
921 {
922         struct rpc_clnt *clnt = task->tk_client;
923         struct rpc_rqst *req = task->tk_rqstp;
924         kxdrproc_t      decode = task->tk_msg.rpc_proc->p_decode;
925         u32             *p;
926
927         dprintk("RPC: %4d call_decode (status %d)\n", 
928                                 task->tk_pid, task->tk_status);
929
930         if (clnt->cl_chatty && (task->tk_flags & RPC_CALL_MAJORSEEN)) {
931                 printk(KERN_NOTICE "%s: server %s OK\n",
932                         clnt->cl_protname, clnt->cl_server);
933                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
934         }
935
936         if (task->tk_status < 12) {
937                 if (!RPC_IS_SOFT(task)) {
938                         task->tk_action = call_bind;
939                         clnt->cl_stats->rpcretrans++;
940                         goto out_retry;
941                 }
942                 printk(KERN_WARNING "%s: too small RPC reply size (%d bytes)\n",
943                         clnt->cl_protname, task->tk_status);
944                 rpc_exit(task, -EIO);
945                 return;
946         }
947
948         req->rq_rcv_buf.len = req->rq_private_buf.len;
949
950         /* Check that the softirq receive buffer is valid */
951         WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
952                                 sizeof(req->rq_rcv_buf)) != 0);
953
954         /* Verify the RPC header */
955         if (!(p = call_verify(task))) {
956                 if (task->tk_action == NULL)
957                         return;
958                 goto out_retry;
959         }
960
961         task->tk_action = NULL;
962
963         if (decode)
964                 task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
965                                                       task->tk_msg.rpc_resp);
966         dprintk("RPC: %4d call_decode result %d\n", task->tk_pid,
967                                         task->tk_status);
968         return;
969 out_retry:
970         req->rq_received = req->rq_private_buf.len = 0;
971         task->tk_status = 0;
972 }
973
974 /*
975  * 8.   Refresh the credentials if rejected by the server
976  */
977 static void
978 call_refresh(struct rpc_task *task)
979 {
980         dprintk("RPC: %4d call_refresh\n", task->tk_pid);
981
982         xprt_release(task);     /* Must do to obtain new XID */
983         task->tk_action = call_refreshresult;
984         task->tk_status = 0;
985         task->tk_client->cl_stats->rpcauthrefresh++;
986         rpcauth_refreshcred(task);
987 }
988
989 /*
990  * 8a.  Process the results of a credential refresh
991  */
992 static void
993 call_refreshresult(struct rpc_task *task)
994 {
995         int status = task->tk_status;
996         dprintk("RPC: %4d call_refreshresult (status %d)\n", 
997                                 task->tk_pid, task->tk_status);
998
999         task->tk_status = 0;
1000         task->tk_action = call_reserve;
1001         if (status >= 0 && rpcauth_uptodatecred(task))
1002                 return;
1003         if (status == -EACCES) {
1004                 rpc_exit(task, -EACCES);
1005                 return;
1006         }
1007         task->tk_action = call_refresh;
1008         if (status != -ETIMEDOUT)
1009                 rpc_delay(task, 3*HZ);
1010         return;
1011 }
1012
1013 /*
1014  * Call header serialization
1015  */
1016 static u32 *
1017 call_header(struct rpc_task *task)
1018 {
1019         struct rpc_clnt *clnt = task->tk_client;
1020         struct rpc_xprt *xprt = clnt->cl_xprt;
1021         struct rpc_rqst *req = task->tk_rqstp;
1022         u32             *p = req->rq_svec[0].iov_base;
1023
1024         /* FIXME: check buffer size? */
1025         if (xprt->stream)
1026                 *p++ = 0;               /* fill in later */
1027         *p++ = req->rq_xid;             /* XID */
1028         *p++ = htonl(RPC_CALL);         /* CALL */
1029         *p++ = htonl(RPC_VERSION);      /* RPC version */
1030         *p++ = htonl(clnt->cl_prog);    /* program number */
1031         *p++ = htonl(clnt->cl_vers);    /* program version */
1032         *p++ = htonl(task->tk_msg.rpc_proc->p_proc);    /* procedure */
1033         p = rpcauth_marshcred(task, p);
1034         req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
1035         return p;
1036 }
1037
1038 /*
1039  * Reply header verification
1040  */
1041 static u32 *
1042 call_verify(struct rpc_task *task)
1043 {
1044         struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
1045         int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
1046         u32     *p = iov->iov_base, n;
1047         int error = -EACCES;
1048
1049         if ((len -= 3) < 0)
1050                 goto out_overflow;
1051         p += 1; /* skip XID */
1052
1053         if ((n = ntohl(*p++)) != RPC_REPLY) {
1054                 printk(KERN_WARNING "call_verify: not an RPC reply: %x\n", n);
1055                 goto out_retry;
1056         }
1057         if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
1058                 if (--len < 0)
1059                         goto out_overflow;
1060                 switch ((n = ntohl(*p++))) {
1061                         case RPC_AUTH_ERROR:
1062                                 break;
1063                         case RPC_MISMATCH:
1064                                 dprintk("%s: RPC call version mismatch!\n", __FUNCTION__);
1065                                 error = -EPROTONOSUPPORT;
1066                                 goto out_err;
1067                         default:
1068                                 dprintk("%s: RPC call rejected, unknown error: %x\n", __FUNCTION__, n);
1069                                 goto out_eio;
1070                 }
1071                 if (--len < 0)
1072                         goto out_overflow;
1073                 switch ((n = ntohl(*p++))) {
1074                 case RPC_AUTH_REJECTEDCRED:
1075                 case RPC_AUTH_REJECTEDVERF:
1076                 case RPCSEC_GSS_CREDPROBLEM:
1077                 case RPCSEC_GSS_CTXPROBLEM:
1078                         if (!task->tk_cred_retry)
1079                                 break;
1080                         task->tk_cred_retry--;
1081                         dprintk("RPC: %4d call_verify: retry stale creds\n",
1082                                                         task->tk_pid);
1083                         rpcauth_invalcred(task);
1084                         task->tk_action = call_refresh;
1085                         return NULL;
1086                 case RPC_AUTH_BADCRED:
1087                 case RPC_AUTH_BADVERF:
1088                         /* possibly garbled cred/verf? */
1089                         if (!task->tk_garb_retry)
1090                                 break;
1091                         task->tk_garb_retry--;
1092                         dprintk("RPC: %4d call_verify: retry garbled creds\n",
1093                                                         task->tk_pid);
1094                         task->tk_action = call_bind;
1095                         return NULL;
1096                 case RPC_AUTH_TOOWEAK:
1097                         printk(KERN_NOTICE "call_verify: server requires stronger "
1098                                "authentication.\n");
1099                         break;
1100                 default:
1101                         printk(KERN_WARNING "call_verify: unknown auth error: %x\n", n);
1102                         error = -EIO;
1103                 }
1104                 dprintk("RPC: %4d call_verify: call rejected %d\n",
1105                                                 task->tk_pid, n);
1106                 goto out_err;
1107         }
1108         if (!(p = rpcauth_checkverf(task, p))) {
1109                 printk(KERN_WARNING "call_verify: auth check failed\n");
1110                 goto out_retry;         /* bad verifier, retry */
1111         }
1112         len = p - (u32 *)iov->iov_base - 1;
1113         if (len < 0)
1114                 goto out_overflow;
1115         switch ((n = ntohl(*p++))) {
1116         case RPC_SUCCESS:
1117                 return p;
1118         case RPC_PROG_UNAVAIL:
1119                 dprintk("RPC: call_verify: program %u is unsupported by server %s\n",
1120                                 (unsigned int)task->tk_client->cl_prog,
1121                                 task->tk_client->cl_server);
1122                 error = -EPFNOSUPPORT;
1123                 goto out_err;
1124         case RPC_PROG_MISMATCH:
1125                 dprintk("RPC: call_verify: program %u, version %u unsupported by server %s\n",
1126                                 (unsigned int)task->tk_client->cl_prog,
1127                                 (unsigned int)task->tk_client->cl_vers,
1128                                 task->tk_client->cl_server);
1129                 error = -EPROTONOSUPPORT;
1130                 goto out_err;
1131         case RPC_PROC_UNAVAIL:
1132                 dprintk("RPC: call_verify: proc %p unsupported by program %u, version %u on server %s\n",
1133                                 task->tk_msg.rpc_proc,
1134                                 task->tk_client->cl_prog,
1135                                 task->tk_client->cl_vers,
1136                                 task->tk_client->cl_server);
1137                 error = -EOPNOTSUPP;
1138                 goto out_err;
1139         case RPC_GARBAGE_ARGS:
1140                 dprintk("RPC: %4d %s: server saw garbage\n", task->tk_pid, __FUNCTION__);
1141                 break;                  /* retry */
1142         default:
1143                 printk(KERN_WARNING "call_verify: server accept status: %x\n", n);
1144                 /* Also retry */
1145         }
1146
1147 out_retry:
1148         task->tk_client->cl_stats->rpcgarbage++;
1149         if (task->tk_garb_retry) {
1150                 task->tk_garb_retry--;
1151                 dprintk("RPC %s: retrying %4d\n", __FUNCTION__, task->tk_pid);
1152                 task->tk_action = call_bind;
1153                 return NULL;
1154         }
1155         printk(KERN_WARNING "RPC %s: retry failed, exit EIO\n", __FUNCTION__);
1156 out_eio:
1157         error = -EIO;
1158 out_err:
1159         rpc_exit(task, error);
1160         return NULL;
1161 out_overflow:
1162         printk(KERN_WARNING "RPC %s: server reply was truncated.\n", __FUNCTION__);
1163         goto out_retry;
1164 }
1165
1166 static int rpcproc_encode_null(void *rqstp, u32 *data, void *obj)
1167 {
1168         return 0;
1169 }
1170
1171 static int rpcproc_decode_null(void *rqstp, u32 *data, void *obj)
1172 {
1173         return 0;
1174 }
1175
1176 static struct rpc_procinfo rpcproc_null = {
1177         .p_encode = rpcproc_encode_null,
1178         .p_decode = rpcproc_decode_null,
1179 };
1180
1181 int rpc_ping(struct rpc_clnt *clnt, int flags)
1182 {
1183         struct rpc_message msg = {
1184                 .rpc_proc = &rpcproc_null,
1185         };
1186         int err;
1187         msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
1188         err = rpc_call_sync(clnt, &msg, flags);
1189         put_rpccred(msg.rpc_cred);
1190         return err;
1191 }