]> err.no Git - linux-2.6/blob - net/sunrpc/sched.c
SUNRPC: Add a timer function to wait queues.
[linux-2.6] / net / sunrpc / sched.c
1 /*
2  * linux/net/sunrpc/sched.c
3  *
4  * Scheduling for synchronous and asynchronous RPC requests.
5  *
6  * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de>
7  *
8  * TCP NFS related read + write fixes
9  * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10  */
11
12 #include <linux/module.h>
13
14 #include <linux/sched.h>
15 #include <linux/interrupt.h>
16 #include <linux/slab.h>
17 #include <linux/mempool.h>
18 #include <linux/smp.h>
19 #include <linux/smp_lock.h>
20 #include <linux/spinlock.h>
21 #include <linux/mutex.h>
22
23 #include <linux/sunrpc/clnt.h>
24
25 #ifdef RPC_DEBUG
26 #define RPCDBG_FACILITY         RPCDBG_SCHED
27 #define RPC_TASK_MAGIC_ID       0xf00baa
28 #endif
29
30 /*
31  * RPC slabs and memory pools
32  */
33 #define RPC_BUFFER_MAXSIZE      (2048)
34 #define RPC_BUFFER_POOLSIZE     (8)
35 #define RPC_TASK_POOLSIZE       (8)
36 static struct kmem_cache        *rpc_task_slabp __read_mostly;
37 static struct kmem_cache        *rpc_buffer_slabp __read_mostly;
38 static mempool_t        *rpc_task_mempool __read_mostly;
39 static mempool_t        *rpc_buffer_mempool __read_mostly;
40
41 static void                     rpc_async_schedule(struct work_struct *);
42 static void                      rpc_release_task(struct rpc_task *task);
43 static void __rpc_queue_timer_fn(unsigned long ptr);
44
45 /*
46  * RPC tasks sit here while waiting for conditions to improve.
47  */
48 static struct rpc_wait_queue delay_queue;
49
50 /*
51  * rpciod-related stuff
52  */
53 struct workqueue_struct *rpciod_workqueue;
54
55 /*
56  * Disable the timer for a given RPC task. Should be called with
57  * queue->lock and bh_disabled in order to avoid races within
58  * rpc_run_timer().
59  */
60 static void
61 __rpc_disable_timer(struct rpc_task *task)
62 {
63         if (task->tk_timeout == 0)
64                 return;
65         dprintk("RPC: %5u disabling timer\n", task->tk_pid);
66         task->tk_timeout = 0;
67         list_del(&task->u.tk_wait.timer_list);
68 }
69
70 static void
71 rpc_set_queue_timer(struct rpc_wait_queue *queue, unsigned long expires)
72 {
73         queue->timer_list.expires = expires;
74         mod_timer(&queue->timer_list.timer, expires);
75 }
76
77 /*
78  * Set up a timer for the current task.
79  */
80 static void
81 __rpc_add_timer(struct rpc_task *task)
82 {
83         if (!task->tk_timeout)
84                 return;
85
86         dprintk("RPC: %5u setting alarm for %lu ms\n",
87                         task->tk_pid, task->tk_timeout * 1000 / HZ);
88
89         set_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate);
90         mod_timer(&task->tk_timer, jiffies + task->tk_timeout);
91 }
92
93 /*
94  * Delete any timer for the current task. Because we use del_timer_sync(),
95  * this function should never be called while holding queue->lock.
96  */
97 static void
98 rpc_delete_timer(struct rpc_task *task)
99 {
100         if (RPC_IS_QUEUED(task))
101                 return;
102         if (test_and_clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate)) {
103                 del_singleshot_timer_sync(&task->tk_timer);
104                 dprintk("RPC: %5u deleting timer\n", task->tk_pid);
105         }
106 }
107
108 /*
109  * Add new request to a priority queue.
110  */
111 static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct rpc_task *task)
112 {
113         struct list_head *q;
114         struct rpc_task *t;
115
116         INIT_LIST_HEAD(&task->u.tk_wait.links);
117         q = &queue->tasks[task->tk_priority];
118         if (unlikely(task->tk_priority > queue->maxpriority))
119                 q = &queue->tasks[queue->maxpriority];
120         list_for_each_entry(t, q, u.tk_wait.list) {
121                 if (t->tk_owner == task->tk_owner) {
122                         list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links);
123                         return;
124                 }
125         }
126         list_add_tail(&task->u.tk_wait.list, q);
127 }
128
129 /*
130  * Add new request to wait queue.
131  *
132  * Swapper tasks always get inserted at the head of the queue.
133  * This should avoid many nasty memory deadlocks and hopefully
134  * improve overall performance.
135  * Everyone else gets appended to the queue to ensure proper FIFO behavior.
136  */
137 static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
138 {
139         BUG_ON (RPC_IS_QUEUED(task));
140
141         if (RPC_IS_PRIORITY(queue))
142                 __rpc_add_wait_queue_priority(queue, task);
143         else if (RPC_IS_SWAPPER(task))
144                 list_add(&task->u.tk_wait.list, &queue->tasks[0]);
145         else
146                 list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
147         task->tk_waitqueue = queue;
148         queue->qlen++;
149         rpc_set_queued(task);
150
151         dprintk("RPC: %5u added to queue %p \"%s\"\n",
152                         task->tk_pid, queue, rpc_qname(queue));
153 }
154
155 /*
156  * Remove request from a priority queue.
157  */
158 static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
159 {
160         struct rpc_task *t;
161
162         if (!list_empty(&task->u.tk_wait.links)) {
163                 t = list_entry(task->u.tk_wait.links.next, struct rpc_task, u.tk_wait.list);
164                 list_move(&t->u.tk_wait.list, &task->u.tk_wait.list);
165                 list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links);
166         }
167 }
168
169 /*
170  * Remove request from queue.
171  * Note: must be called with spin lock held.
172  */
173 static void __rpc_remove_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
174 {
175         __rpc_disable_timer(task);
176         if (RPC_IS_PRIORITY(queue))
177                 __rpc_remove_wait_queue_priority(task);
178         list_del(&task->u.tk_wait.list);
179         queue->qlen--;
180         dprintk("RPC: %5u removed from queue %p \"%s\"\n",
181                         task->tk_pid, queue, rpc_qname(queue));
182 }
183
184 static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority)
185 {
186         queue->priority = priority;
187         queue->count = 1 << (priority * 2);
188 }
189
190 static inline void rpc_set_waitqueue_owner(struct rpc_wait_queue *queue, pid_t pid)
191 {
192         queue->owner = pid;
193         queue->nr = RPC_BATCH_COUNT;
194 }
195
196 static inline void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue)
197 {
198         rpc_set_waitqueue_priority(queue, queue->maxpriority);
199         rpc_set_waitqueue_owner(queue, 0);
200 }
201
202 static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, unsigned char nr_queues)
203 {
204         int i;
205
206         spin_lock_init(&queue->lock);
207         for (i = 0; i < ARRAY_SIZE(queue->tasks); i++)
208                 INIT_LIST_HEAD(&queue->tasks[i]);
209         queue->maxpriority = nr_queues - 1;
210         rpc_reset_waitqueue_priority(queue);
211         queue->qlen = 0;
212         setup_timer(&queue->timer_list.timer, __rpc_queue_timer_fn, (unsigned long)queue);
213         INIT_LIST_HEAD(&queue->timer_list.list);
214 #ifdef RPC_DEBUG
215         queue->name = qname;
216 #endif
217 }
218
219 void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname)
220 {
221         __rpc_init_priority_wait_queue(queue, qname, RPC_NR_PRIORITY);
222 }
223
224 void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
225 {
226         __rpc_init_priority_wait_queue(queue, qname, 1);
227 }
228 EXPORT_SYMBOL_GPL(rpc_init_wait_queue);
229
230 void rpc_destroy_wait_queue(struct rpc_wait_queue *queue)
231 {
232         del_timer_sync(&queue->timer_list.timer);
233 }
234 EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue);
235
236 static int rpc_wait_bit_killable(void *word)
237 {
238         if (fatal_signal_pending(current))
239                 return -ERESTARTSYS;
240         schedule();
241         return 0;
242 }
243
244 #ifdef RPC_DEBUG
245 static void rpc_task_set_debuginfo(struct rpc_task *task)
246 {
247         static atomic_t rpc_pid;
248
249         task->tk_magic = RPC_TASK_MAGIC_ID;
250         task->tk_pid = atomic_inc_return(&rpc_pid);
251 }
252 #else
253 static inline void rpc_task_set_debuginfo(struct rpc_task *task)
254 {
255 }
256 #endif
257
258 static void rpc_set_active(struct rpc_task *task)
259 {
260         struct rpc_clnt *clnt;
261         if (test_and_set_bit(RPC_TASK_ACTIVE, &task->tk_runstate) != 0)
262                 return;
263         rpc_task_set_debuginfo(task);
264         /* Add to global list of all tasks */
265         clnt = task->tk_client;
266         if (clnt != NULL) {
267                 spin_lock(&clnt->cl_lock);
268                 list_add_tail(&task->tk_task, &clnt->cl_tasks);
269                 spin_unlock(&clnt->cl_lock);
270         }
271 }
272
273 /*
274  * Mark an RPC call as having completed by clearing the 'active' bit
275  */
276 static void rpc_mark_complete_task(struct rpc_task *task)
277 {
278         smp_mb__before_clear_bit();
279         clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
280         smp_mb__after_clear_bit();
281         wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE);
282 }
283
284 /*
285  * Allow callers to wait for completion of an RPC call
286  */
287 int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
288 {
289         if (action == NULL)
290                 action = rpc_wait_bit_killable;
291         return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
292                         action, TASK_KILLABLE);
293 }
294 EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
295
296 /*
297  * Make an RPC task runnable.
298  *
299  * Note: If the task is ASYNC, this must be called with
300  * the spinlock held to protect the wait queue operation.
301  */
302 static void rpc_make_runnable(struct rpc_task *task)
303 {
304         rpc_clear_queued(task);
305         if (rpc_test_and_set_running(task))
306                 return;
307         /* We might have raced */
308         if (RPC_IS_QUEUED(task)) {
309                 rpc_clear_running(task);
310                 return;
311         }
312         if (RPC_IS_ASYNC(task)) {
313                 int status;
314
315                 INIT_WORK(&task->u.tk_work, rpc_async_schedule);
316                 status = queue_work(rpciod_workqueue, &task->u.tk_work);
317                 if (status < 0) {
318                         printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
319                         task->tk_status = status;
320                         return;
321                 }
322         } else
323                 wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
324 }
325
326 /*
327  * Prepare for sleeping on a wait queue.
328  * By always appending tasks to the list we ensure FIFO behavior.
329  * NB: An RPC task will only receive interrupt-driven events as long
330  * as it's on a wait queue.
331  */
332 static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
333                         rpc_action action)
334 {
335         dprintk("RPC: %5u sleep_on(queue \"%s\" time %lu)\n",
336                         task->tk_pid, rpc_qname(q), jiffies);
337
338         if (!RPC_IS_ASYNC(task) && !RPC_IS_ACTIVATED(task)) {
339                 printk(KERN_ERR "RPC: Inactive synchronous task put to sleep!\n");
340                 return;
341         }
342
343         __rpc_add_wait_queue(q, task);
344
345         BUG_ON(task->tk_callback != NULL);
346         task->tk_callback = action;
347         __rpc_add_timer(task);
348 }
349
350 void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
351                                 rpc_action action)
352 {
353         /* Mark the task as being activated if so needed */
354         rpc_set_active(task);
355
356         /*
357          * Protect the queue operations.
358          */
359         spin_lock_bh(&q->lock);
360         __rpc_sleep_on(q, task, action);
361         spin_unlock_bh(&q->lock);
362 }
363 EXPORT_SYMBOL_GPL(rpc_sleep_on);
364
365 /**
366  * __rpc_do_wake_up_task - wake up a single rpc_task
367  * @queue: wait queue
368  * @task: task to be woken up
369  *
370  * Caller must hold queue->lock, and have cleared the task queued flag.
371  */
372 static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task *task)
373 {
374         dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n",
375                         task->tk_pid, jiffies);
376
377 #ifdef RPC_DEBUG
378         BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
379 #endif
380         /* Has the task been executed yet? If not, we cannot wake it up! */
381         if (!RPC_IS_ACTIVATED(task)) {
382                 printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
383                 return;
384         }
385
386         __rpc_remove_wait_queue(queue, task);
387
388         rpc_make_runnable(task);
389
390         dprintk("RPC:       __rpc_wake_up_task done\n");
391 }
392
393 /*
394  * Wake up a queued task while the queue lock is being held
395  */
396 static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
397 {
398         if (!RPC_IS_QUEUED(task) || task->tk_waitqueue != queue)
399                 return;
400         if (rpc_start_wakeup(task)) {
401                         __rpc_do_wake_up_task(queue, task);
402                 rpc_finish_wakeup(task);
403         }
404 }
405
406 /*
407  * Wake up a task on a specific queue
408  */
409 void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task)
410 {
411         rcu_read_lock_bh();
412         spin_lock(&queue->lock);
413         rpc_wake_up_task_queue_locked(queue, task);
414         spin_unlock(&queue->lock);
415         rcu_read_unlock_bh();
416 }
417 EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task);
418
419 /*
420  * Wake up the specified task
421  */
422 static void rpc_wake_up_task(struct rpc_task *task)
423 {
424         rpc_wake_up_queued_task(task->tk_waitqueue, task);
425 }
426
427 /*
428  * Wake up the next task on a priority queue.
429  */
430 static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queue)
431 {
432         struct list_head *q;
433         struct rpc_task *task;
434
435         /*
436          * Service a batch of tasks from a single owner.
437          */
438         q = &queue->tasks[queue->priority];
439         if (!list_empty(q)) {
440                 task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
441                 if (queue->owner == task->tk_owner) {
442                         if (--queue->nr)
443                                 goto out;
444                         list_move_tail(&task->u.tk_wait.list, q);
445                 }
446                 /*
447                  * Check if we need to switch queues.
448                  */
449                 if (--queue->count)
450                         goto new_owner;
451         }
452
453         /*
454          * Service the next queue.
455          */
456         do {
457                 if (q == &queue->tasks[0])
458                         q = &queue->tasks[queue->maxpriority];
459                 else
460                         q = q - 1;
461                 if (!list_empty(q)) {
462                         task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
463                         goto new_queue;
464                 }
465         } while (q != &queue->tasks[queue->priority]);
466
467         rpc_reset_waitqueue_priority(queue);
468         return NULL;
469
470 new_queue:
471         rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0]));
472 new_owner:
473         rpc_set_waitqueue_owner(queue, task->tk_owner);
474 out:
475         rpc_wake_up_task_queue_locked(queue, task);
476         return task;
477 }
478
479 /*
480  * Wake up the next task on the wait queue.
481  */
482 struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue)
483 {
484         struct rpc_task *task = NULL;
485
486         dprintk("RPC:       wake_up_next(%p \"%s\")\n",
487                         queue, rpc_qname(queue));
488         rcu_read_lock_bh();
489         spin_lock(&queue->lock);
490         if (RPC_IS_PRIORITY(queue))
491                 task = __rpc_wake_up_next_priority(queue);
492         else {
493                 task_for_first(task, &queue->tasks[0])
494                         rpc_wake_up_task_queue_locked(queue, task);
495         }
496         spin_unlock(&queue->lock);
497         rcu_read_unlock_bh();
498
499         return task;
500 }
501 EXPORT_SYMBOL_GPL(rpc_wake_up_next);
502
503 /**
504  * rpc_wake_up - wake up all rpc_tasks
505  * @queue: rpc_wait_queue on which the tasks are sleeping
506  *
507  * Grabs queue->lock
508  */
509 void rpc_wake_up(struct rpc_wait_queue *queue)
510 {
511         struct rpc_task *task, *next;
512         struct list_head *head;
513
514         rcu_read_lock_bh();
515         spin_lock(&queue->lock);
516         head = &queue->tasks[queue->maxpriority];
517         for (;;) {
518                 list_for_each_entry_safe(task, next, head, u.tk_wait.list)
519                         rpc_wake_up_task_queue_locked(queue, task);
520                 if (head == &queue->tasks[0])
521                         break;
522                 head--;
523         }
524         spin_unlock(&queue->lock);
525         rcu_read_unlock_bh();
526 }
527 EXPORT_SYMBOL_GPL(rpc_wake_up);
528
529 /**
530  * rpc_wake_up_status - wake up all rpc_tasks and set their status value.
531  * @queue: rpc_wait_queue on which the tasks are sleeping
532  * @status: status value to set
533  *
534  * Grabs queue->lock
535  */
536 void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
537 {
538         struct rpc_task *task, *next;
539         struct list_head *head;
540
541         rcu_read_lock_bh();
542         spin_lock(&queue->lock);
543         head = &queue->tasks[queue->maxpriority];
544         for (;;) {
545                 list_for_each_entry_safe(task, next, head, u.tk_wait.list) {
546                         task->tk_status = status;
547                         rpc_wake_up_task_queue_locked(queue, task);
548                 }
549                 if (head == &queue->tasks[0])
550                         break;
551                 head--;
552         }
553         spin_unlock(&queue->lock);
554         rcu_read_unlock_bh();
555 }
556 EXPORT_SYMBOL_GPL(rpc_wake_up_status);
557
558 /*
559  * Run a timeout function.
560  */
561 static void rpc_run_timer(unsigned long ptr)
562 {
563         struct rpc_task *task = (struct rpc_task *)ptr;
564         struct rpc_wait_queue *queue = task->tk_waitqueue;
565
566         spin_lock(&queue->lock);
567         if (RPC_IS_QUEUED(task) && task->tk_waitqueue == queue) {
568                 dprintk("RPC: %5u timeout\n", task->tk_pid);
569                 task->tk_status = -ETIMEDOUT;
570                 rpc_wake_up_task_queue_locked(queue, task);
571         }
572         spin_unlock(&queue->lock);
573         smp_mb__before_clear_bit();
574         clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate);
575         smp_mb__after_clear_bit();
576 }
577
578 static void __rpc_queue_timer_fn(unsigned long ptr)
579 {
580         struct rpc_wait_queue *queue = (struct rpc_wait_queue *)ptr;
581         struct rpc_task *task, *n;
582         unsigned long expires, now, timeo;
583
584         spin_lock(&queue->lock);
585         expires = now = jiffies;
586         list_for_each_entry_safe(task, n, &queue->timer_list.list, u.tk_wait.timer_list) {
587                 timeo = task->u.tk_wait.expires;
588                 if (time_after_eq(now, timeo)) {
589                         list_del_init(&task->u.tk_wait.timer_list);
590                         dprintk("RPC: %5u timeout\n", task->tk_pid);
591                         task->tk_status = -ETIMEDOUT;
592                         rpc_wake_up_task_queue_locked(queue, task);
593                         continue;
594                 }
595                 if (expires == now || time_after(expires, timeo))
596                         expires = timeo;
597         }
598         if (!list_empty(&queue->timer_list.list))
599                 rpc_set_queue_timer(queue, expires);
600         spin_unlock(&queue->lock);
601 }
602
603 static void __rpc_atrun(struct rpc_task *task)
604 {
605         task->tk_status = 0;
606 }
607
608 /*
609  * Run a task at a later time
610  */
611 void rpc_delay(struct rpc_task *task, unsigned long delay)
612 {
613         task->tk_timeout = delay;
614         rpc_sleep_on(&delay_queue, task, __rpc_atrun);
615 }
616 EXPORT_SYMBOL_GPL(rpc_delay);
617
618 /*
619  * Helper to call task->tk_ops->rpc_call_prepare
620  */
621 static void rpc_prepare_task(struct rpc_task *task)
622 {
623         lock_kernel();
624         task->tk_ops->rpc_call_prepare(task, task->tk_calldata);
625         unlock_kernel();
626 }
627
628 /*
629  * Helper that calls task->tk_ops->rpc_call_done if it exists
630  */
631 void rpc_exit_task(struct rpc_task *task)
632 {
633         task->tk_action = NULL;
634         if (task->tk_ops->rpc_call_done != NULL) {
635                 lock_kernel();
636                 task->tk_ops->rpc_call_done(task, task->tk_calldata);
637                 unlock_kernel();
638                 if (task->tk_action != NULL) {
639                         WARN_ON(RPC_ASSASSINATED(task));
640                         /* Always release the RPC slot and buffer memory */
641                         xprt_release(task);
642                 }
643         }
644 }
645 EXPORT_SYMBOL_GPL(rpc_exit_task);
646
647 void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata)
648 {
649         if (ops->rpc_release != NULL) {
650                 lock_kernel();
651                 ops->rpc_release(calldata);
652                 unlock_kernel();
653         }
654 }
655
656 /*
657  * This is the RPC `scheduler' (or rather, the finite state machine).
658  */
659 static void __rpc_execute(struct rpc_task *task)
660 {
661         int             status = 0;
662
663         dprintk("RPC: %5u __rpc_execute flags=0x%x\n",
664                         task->tk_pid, task->tk_flags);
665
666         BUG_ON(RPC_IS_QUEUED(task));
667
668         for (;;) {
669                 /*
670                  * Garbage collection of pending timers...
671                  */
672                 rpc_delete_timer(task);
673
674                 /*
675                  * Execute any pending callback.
676                  */
677                 if (RPC_DO_CALLBACK(task)) {
678                         /* Define a callback save pointer */
679                         void (*save_callback)(struct rpc_task *);
680
681                         /*
682                          * If a callback exists, save it, reset it,
683                          * call it.
684                          * The save is needed to stop from resetting
685                          * another callback set within the callback handler
686                          * - Dave
687                          */
688                         save_callback=task->tk_callback;
689                         task->tk_callback=NULL;
690                         save_callback(task);
691                 }
692
693                 /*
694                  * Perform the next FSM step.
695                  * tk_action may be NULL when the task has been killed
696                  * by someone else.
697                  */
698                 if (!RPC_IS_QUEUED(task)) {
699                         if (task->tk_action == NULL)
700                                 break;
701                         task->tk_action(task);
702                 }
703
704                 /*
705                  * Lockless check for whether task is sleeping or not.
706                  */
707                 if (!RPC_IS_QUEUED(task))
708                         continue;
709                 rpc_clear_running(task);
710                 if (RPC_IS_ASYNC(task)) {
711                         /* Careful! we may have raced... */
712                         if (RPC_IS_QUEUED(task))
713                                 return;
714                         if (rpc_test_and_set_running(task))
715                                 return;
716                         continue;
717                 }
718
719                 /* sync task: sleep here */
720                 dprintk("RPC: %5u sync task going to sleep\n", task->tk_pid);
721                 status = out_of_line_wait_on_bit(&task->tk_runstate,
722                                 RPC_TASK_QUEUED, rpc_wait_bit_killable,
723                                 TASK_KILLABLE);
724                 if (status == -ERESTARTSYS) {
725                         /*
726                          * When a sync task receives a signal, it exits with
727                          * -ERESTARTSYS. In order to catch any callbacks that
728                          * clean up after sleeping on some queue, we don't
729                          * break the loop here, but go around once more.
730                          */
731                         dprintk("RPC: %5u got signal\n", task->tk_pid);
732                         task->tk_flags |= RPC_TASK_KILLED;
733                         rpc_exit(task, -ERESTARTSYS);
734                         rpc_wake_up_task(task);
735                 }
736                 rpc_set_running(task);
737                 dprintk("RPC: %5u sync task resuming\n", task->tk_pid);
738         }
739
740         dprintk("RPC: %5u return %d, status %d\n", task->tk_pid, status,
741                         task->tk_status);
742         /* Release all resources associated with the task */
743         rpc_release_task(task);
744 }
745
746 /*
747  * User-visible entry point to the scheduler.
748  *
749  * This may be called recursively if e.g. an async NFS task updates
750  * the attributes and finds that dirty pages must be flushed.
751  * NOTE: Upon exit of this function the task is guaranteed to be
752  *       released. In particular note that tk_release() will have
753  *       been called, so your task memory may have been freed.
754  */
755 void rpc_execute(struct rpc_task *task)
756 {
757         rpc_set_active(task);
758         rpc_set_running(task);
759         __rpc_execute(task);
760 }
761
762 static void rpc_async_schedule(struct work_struct *work)
763 {
764         __rpc_execute(container_of(work, struct rpc_task, u.tk_work));
765 }
766
767 struct rpc_buffer {
768         size_t  len;
769         char    data[];
770 };
771
772 /**
773  * rpc_malloc - allocate an RPC buffer
774  * @task: RPC task that will use this buffer
775  * @size: requested byte size
776  *
777  * To prevent rpciod from hanging, this allocator never sleeps,
778  * returning NULL if the request cannot be serviced immediately.
779  * The caller can arrange to sleep in a way that is safe for rpciod.
780  *
781  * Most requests are 'small' (under 2KiB) and can be serviced from a
782  * mempool, ensuring that NFS reads and writes can always proceed,
783  * and that there is good locality of reference for these buffers.
784  *
785  * In order to avoid memory starvation triggering more writebacks of
786  * NFS requests, we avoid using GFP_KERNEL.
787  */
788 void *rpc_malloc(struct rpc_task *task, size_t size)
789 {
790         struct rpc_buffer *buf;
791         gfp_t gfp = RPC_IS_SWAPPER(task) ? GFP_ATOMIC : GFP_NOWAIT;
792
793         size += sizeof(struct rpc_buffer);
794         if (size <= RPC_BUFFER_MAXSIZE)
795                 buf = mempool_alloc(rpc_buffer_mempool, gfp);
796         else
797                 buf = kmalloc(size, gfp);
798
799         if (!buf)
800                 return NULL;
801
802         buf->len = size;
803         dprintk("RPC: %5u allocated buffer of size %zu at %p\n",
804                         task->tk_pid, size, buf);
805         return &buf->data;
806 }
807 EXPORT_SYMBOL_GPL(rpc_malloc);
808
809 /**
810  * rpc_free - free buffer allocated via rpc_malloc
811  * @buffer: buffer to free
812  *
813  */
814 void rpc_free(void *buffer)
815 {
816         size_t size;
817         struct rpc_buffer *buf;
818
819         if (!buffer)
820                 return;
821
822         buf = container_of(buffer, struct rpc_buffer, data);
823         size = buf->len;
824
825         dprintk("RPC:       freeing buffer of size %zu at %p\n",
826                         size, buf);
827
828         if (size <= RPC_BUFFER_MAXSIZE)
829                 mempool_free(buf, rpc_buffer_mempool);
830         else
831                 kfree(buf);
832 }
833 EXPORT_SYMBOL_GPL(rpc_free);
834
835 /*
836  * Creation and deletion of RPC task structures
837  */
838 static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *task_setup_data)
839 {
840         memset(task, 0, sizeof(*task));
841         setup_timer(&task->tk_timer, rpc_run_timer, (unsigned long)task);
842         atomic_set(&task->tk_count, 1);
843         task->tk_flags  = task_setup_data->flags;
844         task->tk_ops = task_setup_data->callback_ops;
845         task->tk_calldata = task_setup_data->callback_data;
846         INIT_LIST_HEAD(&task->tk_task);
847
848         /* Initialize retry counters */
849         task->tk_garb_retry = 2;
850         task->tk_cred_retry = 2;
851
852         task->tk_priority = task_setup_data->priority - RPC_PRIORITY_LOW;
853         task->tk_owner = current->tgid;
854
855         /* Initialize workqueue for async tasks */
856         task->tk_workqueue = task_setup_data->workqueue;
857
858         task->tk_client = task_setup_data->rpc_client;
859         if (task->tk_client != NULL) {
860                 kref_get(&task->tk_client->cl_kref);
861                 if (task->tk_client->cl_softrtry)
862                         task->tk_flags |= RPC_TASK_SOFT;
863         }
864
865         if (task->tk_ops->rpc_call_prepare != NULL)
866                 task->tk_action = rpc_prepare_task;
867
868         if (task_setup_data->rpc_message != NULL) {
869                 memcpy(&task->tk_msg, task_setup_data->rpc_message, sizeof(task->tk_msg));
870                 /* Bind the user cred */
871                 if (task->tk_msg.rpc_cred != NULL)
872                         rpcauth_holdcred(task);
873                 else
874                         rpcauth_bindcred(task);
875                 if (task->tk_action == NULL)
876                         rpc_call_start(task);
877         }
878
879         /* starting timestamp */
880         task->tk_start = jiffies;
881
882         dprintk("RPC:       new task initialized, procpid %u\n",
883                                 task_pid_nr(current));
884 }
885
886 static struct rpc_task *
887 rpc_alloc_task(void)
888 {
889         return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS);
890 }
891
892 static void rpc_free_task_rcu(struct rcu_head *rcu)
893 {
894         struct rpc_task *task = container_of(rcu, struct rpc_task, u.tk_rcu);
895         dprintk("RPC: %5u freeing task\n", task->tk_pid);
896         mempool_free(task, rpc_task_mempool);
897 }
898
899 /*
900  * Create a new task for the specified client.
901  */
902 struct rpc_task *rpc_new_task(const struct rpc_task_setup *setup_data)
903 {
904         struct rpc_task *task = setup_data->task;
905         unsigned short flags = 0;
906
907         if (task == NULL) {
908                 task = rpc_alloc_task();
909                 if (task == NULL)
910                         goto out;
911                 flags = RPC_TASK_DYNAMIC;
912         }
913
914         rpc_init_task(task, setup_data);
915
916         task->tk_flags |= flags;
917         dprintk("RPC:       allocated task %p\n", task);
918 out:
919         return task;
920 }
921
922 static void rpc_free_task(struct rpc_task *task)
923 {
924         const struct rpc_call_ops *tk_ops = task->tk_ops;
925         void *calldata = task->tk_calldata;
926
927         if (task->tk_flags & RPC_TASK_DYNAMIC)
928                 call_rcu_bh(&task->u.tk_rcu, rpc_free_task_rcu);
929         rpc_release_calldata(tk_ops, calldata);
930 }
931
932 static void rpc_async_release(struct work_struct *work)
933 {
934         rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
935 }
936
937 void rpc_put_task(struct rpc_task *task)
938 {
939         if (!atomic_dec_and_test(&task->tk_count))
940                 return;
941         /* Release resources */
942         if (task->tk_rqstp)
943                 xprt_release(task);
944         if (task->tk_msg.rpc_cred)
945                 rpcauth_unbindcred(task);
946         if (task->tk_client) {
947                 rpc_release_client(task->tk_client);
948                 task->tk_client = NULL;
949         }
950         if (task->tk_workqueue != NULL) {
951                 INIT_WORK(&task->u.tk_work, rpc_async_release);
952                 queue_work(task->tk_workqueue, &task->u.tk_work);
953         } else
954                 rpc_free_task(task);
955 }
956 EXPORT_SYMBOL_GPL(rpc_put_task);
957
958 static void rpc_release_task(struct rpc_task *task)
959 {
960 #ifdef RPC_DEBUG
961         BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
962 #endif
963         dprintk("RPC: %5u release task\n", task->tk_pid);
964
965         if (!list_empty(&task->tk_task)) {
966                 struct rpc_clnt *clnt = task->tk_client;
967                 /* Remove from client task list */
968                 spin_lock(&clnt->cl_lock);
969                 list_del(&task->tk_task);
970                 spin_unlock(&clnt->cl_lock);
971         }
972         BUG_ON (RPC_IS_QUEUED(task));
973
974         /* Synchronously delete any running timer */
975         rpc_delete_timer(task);
976
977 #ifdef RPC_DEBUG
978         task->tk_magic = 0;
979 #endif
980         /* Wake up anyone who is waiting for task completion */
981         rpc_mark_complete_task(task);
982
983         rpc_put_task(task);
984 }
985
986 /*
987  * Kill all tasks for the given client.
988  * XXX: kill their descendants as well?
989  */
990 void rpc_killall_tasks(struct rpc_clnt *clnt)
991 {
992         struct rpc_task *rovr;
993
994
995         if (list_empty(&clnt->cl_tasks))
996                 return;
997         dprintk("RPC:       killing all tasks for client %p\n", clnt);
998         /*
999          * Spin lock all_tasks to prevent changes...
1000          */
1001         spin_lock(&clnt->cl_lock);
1002         list_for_each_entry(rovr, &clnt->cl_tasks, tk_task) {
1003                 if (! RPC_IS_ACTIVATED(rovr))
1004                         continue;
1005                 if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
1006                         rovr->tk_flags |= RPC_TASK_KILLED;
1007                         rpc_exit(rovr, -EIO);
1008                         rpc_wake_up_task(rovr);
1009                 }
1010         }
1011         spin_unlock(&clnt->cl_lock);
1012 }
1013 EXPORT_SYMBOL_GPL(rpc_killall_tasks);
1014
1015 int rpciod_up(void)
1016 {
1017         return try_module_get(THIS_MODULE) ? 0 : -EINVAL;
1018 }
1019
1020 void rpciod_down(void)
1021 {
1022         module_put(THIS_MODULE);
1023 }
1024
1025 /*
1026  * Start up the rpciod workqueue.
1027  */
1028 static int rpciod_start(void)
1029 {
1030         struct workqueue_struct *wq;
1031
1032         /*
1033          * Create the rpciod thread and wait for it to start.
1034          */
1035         dprintk("RPC:       creating workqueue rpciod\n");
1036         wq = create_workqueue("rpciod");
1037         rpciod_workqueue = wq;
1038         return rpciod_workqueue != NULL;
1039 }
1040
1041 static void rpciod_stop(void)
1042 {
1043         struct workqueue_struct *wq = NULL;
1044
1045         if (rpciod_workqueue == NULL)
1046                 return;
1047         dprintk("RPC:       destroying workqueue rpciod\n");
1048
1049         wq = rpciod_workqueue;
1050         rpciod_workqueue = NULL;
1051         destroy_workqueue(wq);
1052 }
1053
1054 void
1055 rpc_destroy_mempool(void)
1056 {
1057         rpciod_stop();
1058         if (rpc_buffer_mempool)
1059                 mempool_destroy(rpc_buffer_mempool);
1060         if (rpc_task_mempool)
1061                 mempool_destroy(rpc_task_mempool);
1062         if (rpc_task_slabp)
1063                 kmem_cache_destroy(rpc_task_slabp);
1064         if (rpc_buffer_slabp)
1065                 kmem_cache_destroy(rpc_buffer_slabp);
1066         rpc_destroy_wait_queue(&delay_queue);
1067 }
1068
1069 int
1070 rpc_init_mempool(void)
1071 {
1072         /*
1073          * The following is not strictly a mempool initialisation,
1074          * but there is no harm in doing it here
1075          */
1076         rpc_init_wait_queue(&delay_queue, "delayq");
1077         if (!rpciod_start())
1078                 goto err_nomem;
1079
1080         rpc_task_slabp = kmem_cache_create("rpc_tasks",
1081                                              sizeof(struct rpc_task),
1082                                              0, SLAB_HWCACHE_ALIGN,
1083                                              NULL);
1084         if (!rpc_task_slabp)
1085                 goto err_nomem;
1086         rpc_buffer_slabp = kmem_cache_create("rpc_buffers",
1087                                              RPC_BUFFER_MAXSIZE,
1088                                              0, SLAB_HWCACHE_ALIGN,
1089                                              NULL);
1090         if (!rpc_buffer_slabp)
1091                 goto err_nomem;
1092         rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE,
1093                                                     rpc_task_slabp);
1094         if (!rpc_task_mempool)
1095                 goto err_nomem;
1096         rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE,
1097                                                       rpc_buffer_slabp);
1098         if (!rpc_buffer_mempool)
1099                 goto err_nomem;
1100         return 0;
1101 err_nomem:
1102         rpc_destroy_mempool();
1103         return -ENOMEM;
1104 }