]> err.no Git - linux-2.6/blobdiff - net/sunrpc/sched.c
SUNRPC: Clean up the initialisation of priority queue scheduling info.
[linux-2.6] / net / sunrpc / sched.c
index 05825154ddd903ac86de6aaeb2f1c8e5a7ea887f..b9061bcf6fc155cce2eaa2e582b0c7168967f344 100644 (file)
@@ -47,23 +47,11 @@ static void                  rpc_release_task(struct rpc_task *task);
  */
 static RPC_WAITQ(delay_queue, "delayq");
 
-/*
- * All RPC clients are linked into this list
- */
-static LIST_HEAD(all_clients);
-
 /*
  * rpciod-related stuff
  */
-static DEFINE_MUTEX(rpciod_mutex);
-static atomic_t rpciod_users = ATOMIC_INIT(0);
 struct workqueue_struct *rpciod_workqueue;
 
-/*
- * Spinlock for other critical sections of code.
- */
-static DEFINE_SPINLOCK(rpc_sched_lock);
-
 /*
  * Disable the timer for a given RPC task. Should be called with
  * queue->lock and bh_disabled in order to avoid races within
@@ -147,7 +135,7 @@ static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct r
        if (unlikely(task->tk_priority > queue->maxpriority))
                q = &queue->tasks[queue->maxpriority];
        list_for_each_entry(t, q, u.tk_wait.list) {
-               if (t->tk_cookie == task->tk_cookie) {
+               if (t->tk_owner == task->tk_owner) {
                        list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links);
                        return;
                }
@@ -220,26 +208,26 @@ static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int
        queue->count = 1 << (priority * 2);
 }
 
-static inline void rpc_set_waitqueue_cookie(struct rpc_wait_queue *queue, unsigned long cookie)
+static inline void rpc_set_waitqueue_owner(struct rpc_wait_queue *queue, pid_t pid)
 {
-       queue->cookie = cookie;
+       queue->owner = pid;
        queue->nr = RPC_BATCH_COUNT;
 }
 
 static inline void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue)
 {
        rpc_set_waitqueue_priority(queue, queue->maxpriority);
-       rpc_set_waitqueue_cookie(queue, 0);
+       rpc_set_waitqueue_owner(queue, 0);
 }
 
-static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, int maxprio)
+static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, unsigned char nr_queues)
 {
        int i;
 
        spin_lock_init(&queue->lock);
        for (i = 0; i < ARRAY_SIZE(queue->tasks); i++)
                INIT_LIST_HEAD(&queue->tasks[i]);
-       queue->maxpriority = maxprio;
+       queue->maxpriority = nr_queues - 1;
        rpc_reset_waitqueue_priority(queue);
 #ifdef RPC_DEBUG
        queue->name = qname;
@@ -248,14 +236,14 @@ static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const c
 
 void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname)
 {
-       __rpc_init_priority_wait_queue(queue, qname, RPC_PRIORITY_HIGH);
+       __rpc_init_priority_wait_queue(queue, qname, RPC_NR_PRIORITY);
 }
 
 void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
 {
-       __rpc_init_priority_wait_queue(queue, qname, 0);
+       __rpc_init_priority_wait_queue(queue, qname, 1);
 }
-EXPORT_SYMBOL(rpc_init_wait_queue);
+EXPORT_SYMBOL_GPL(rpc_init_wait_queue);
 
 static int rpc_wait_bit_interruptible(void *word)
 {
@@ -315,7 +303,7 @@ int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
        return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
                        action, TASK_INTERRUPTIBLE);
 }
-EXPORT_SYMBOL(__rpc_wait_for_completion_task);
+EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
 
 /*
  * Make an RPC task runnable.
@@ -385,6 +373,7 @@ void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
        __rpc_sleep_on(q, task, action, timer);
        spin_unlock_bh(&q->lock);
 }
+EXPORT_SYMBOL_GPL(rpc_sleep_on);
 
 /**
  * __rpc_do_wake_up_task - wake up a single rpc_task
@@ -456,6 +445,7 @@ void rpc_wake_up_task(struct rpc_task *task)
        }
        rcu_read_unlock_bh();
 }
+EXPORT_SYMBOL_GPL(rpc_wake_up_task);
 
 /*
  * Wake up the next task on a priority queue.
@@ -466,12 +456,12 @@ static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queu
        struct rpc_task *task;
 
        /*
-        * Service a batch of tasks from a single cookie.
+        * Service a batch of tasks from a single owner.
         */
        q = &queue->tasks[queue->priority];
        if (!list_empty(q)) {
                task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
-               if (queue->cookie == task->tk_cookie) {
+               if (queue->owner == task->tk_owner) {
                        if (--queue->nr)
                                goto out;
                        list_move_tail(&task->u.tk_wait.list, q);
@@ -480,7 +470,7 @@ static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queu
                 * Check if we need to switch queues.
                 */
                if (--queue->count)
-                       goto new_cookie;
+                       goto new_owner;
        }
 
        /*
@@ -502,8 +492,8 @@ static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queu
 
 new_queue:
        rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0]));
-new_cookie:
-       rpc_set_waitqueue_cookie(queue, task->tk_cookie);
+new_owner:
+       rpc_set_waitqueue_owner(queue, task->tk_owner);
 out:
        __rpc_wake_up_task(task);
        return task;
@@ -531,6 +521,7 @@ struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue)
 
        return task;
 }
+EXPORT_SYMBOL_GPL(rpc_wake_up_next);
 
 /**
  * rpc_wake_up - wake up all rpc_tasks
@@ -556,6 +547,7 @@ void rpc_wake_up(struct rpc_wait_queue *queue)
        spin_unlock(&queue->lock);
        rcu_read_unlock_bh();
 }
+EXPORT_SYMBOL_GPL(rpc_wake_up);
 
 /**
  * rpc_wake_up_status - wake up all rpc_tasks and set their status value.
@@ -584,6 +576,7 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
        spin_unlock(&queue->lock);
        rcu_read_unlock_bh();
 }
+EXPORT_SYMBOL_GPL(rpc_wake_up_status);
 
 static void __rpc_atrun(struct rpc_task *task)
 {
@@ -598,6 +591,7 @@ void rpc_delay(struct rpc_task *task, unsigned long delay)
        task->tk_timeout = delay;
        rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun);
 }
+EXPORT_SYMBOL_GPL(rpc_delay);
 
 /*
  * Helper to call task->tk_ops->rpc_call_prepare
@@ -626,7 +620,7 @@ void rpc_exit_task(struct rpc_task *task)
                }
        }
 }
-EXPORT_SYMBOL(rpc_exit_task);
+EXPORT_SYMBOL_GPL(rpc_exit_task);
 
 void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata)
 {
@@ -743,6 +737,7 @@ void rpc_execute(struct rpc_task *task)
        rpc_set_running(task);
        __rpc_execute(task);
 }
+EXPORT_SYMBOL_GPL(rpc_execute);
 
 static void rpc_async_schedule(struct work_struct *work)
 {
@@ -789,6 +784,7 @@ void *rpc_malloc(struct rpc_task *task, size_t size)
                        task->tk_pid, size, buf);
        return &buf->data;
 }
+EXPORT_SYMBOL_GPL(rpc_malloc);
 
 /**
  * rpc_free - free buffer allocated via rpc_malloc
@@ -814,51 +810,51 @@ void rpc_free(void *buffer)
        else
                kfree(buf);
 }
+EXPORT_SYMBOL_GPL(rpc_free);
 
 /*
  * Creation and deletion of RPC task structures
  */
-void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata)
+void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *task_setup_data)
 {
        memset(task, 0, sizeof(*task));
-       init_timer(&task->tk_timer);
-       task->tk_timer.data     = (unsigned long) task;
-       task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer;
+       setup_timer(&task->tk_timer, (void (*)(unsigned long))rpc_run_timer,
+                       (unsigned long)task);
        atomic_set(&task->tk_count, 1);
-       task->tk_client = clnt;
-       task->tk_flags  = flags;
-       task->tk_ops = tk_ops;
-       if (tk_ops->rpc_call_prepare != NULL)
-               task->tk_action = rpc_prepare_task;
-       task->tk_calldata = calldata;
+       task->tk_flags  = task_setup_data->flags;
+       task->tk_ops = task_setup_data->callback_ops;
+       task->tk_calldata = task_setup_data->callback_data;
        INIT_LIST_HEAD(&task->tk_task);
 
        /* Initialize retry counters */
        task->tk_garb_retry = 2;
        task->tk_cred_retry = 2;
 
-       task->tk_priority = RPC_PRIORITY_NORMAL;
-       task->tk_cookie = (unsigned long)current;
+       task->tk_priority = task_setup_data->priority - RPC_PRIORITY_LOW;
+       task->tk_owner = current->tgid;
 
        /* Initialize workqueue for async tasks */
        task->tk_workqueue = rpciod_workqueue;
 
-       if (clnt) {
-               kref_get(&clnt->cl_kref);
-               if (clnt->cl_softrtry)
+       task->tk_client = task_setup_data->rpc_client;
+       if (task->tk_client != NULL) {
+               kref_get(&task->tk_client->cl_kref);
+               if (task->tk_client->cl_softrtry)
                        task->tk_flags |= RPC_TASK_SOFT;
-               if (!clnt->cl_intr)
+               if (!task->tk_client->cl_intr)
                        task->tk_flags |= RPC_TASK_NOINTR;
        }
 
-       BUG_ON(task->tk_ops == NULL);
+       if (task->tk_ops->rpc_call_prepare != NULL)
+               task->tk_action = rpc_prepare_task;
 
        /* starting timestamp */
        task->tk_start = jiffies;
 
        dprintk("RPC:       new task initialized, procpid %u\n",
-                               current->pid);
+                               task_pid_nr(current));
 }
+EXPORT_SYMBOL_GPL(rpc_init_task);
 
 static struct rpc_task *
 rpc_alloc_task(void)
@@ -876,7 +872,7 @@ static void rpc_free_task(struct rcu_head *rcu)
 /*
  * Create a new task for the specified client.
  */
-struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata)
+struct rpc_task *rpc_new_task(const struct rpc_task_setup *setup_data)
 {
        struct rpc_task *task;
 
@@ -884,7 +880,7 @@ struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc
        if (!task)
                goto out;
 
-       rpc_init_task(task, clnt, flags, tk_ops, calldata);
+       rpc_init_task(task, setup_data);
 
        dprintk("RPC:       allocated task %p\n", task);
        task->tk_flags |= RPC_TASK_DYNAMIC;
@@ -913,7 +909,7 @@ void rpc_put_task(struct rpc_task *task)
                call_rcu_bh(&task->u.tk_rcu, rpc_free_task);
        rpc_release_calldata(tk_ops, calldata);
 }
-EXPORT_SYMBOL(rpc_put_task);
+EXPORT_SYMBOL_GPL(rpc_put_task);
 
 static void rpc_release_task(struct rpc_task *task)
 {
@@ -943,29 +939,6 @@ static void rpc_release_task(struct rpc_task *task)
        rpc_put_task(task);
 }
 
-/**
- * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
- * @clnt: pointer to RPC client
- * @flags: RPC flags
- * @ops: RPC call ops
- * @data: user call data
- */
-struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags,
-                                       const struct rpc_call_ops *ops,
-                                       void *data)
-{
-       struct rpc_task *task;
-       task = rpc_new_task(clnt, flags, ops, data);
-       if (task == NULL) {
-               rpc_release_calldata(ops, data);
-               return ERR_PTR(-ENOMEM);
-       }
-       atomic_inc(&task->tk_count);
-       rpc_execute(task);
-       return task;
-}
-EXPORT_SYMBOL(rpc_run_task);
-
 /*
  * Kill all tasks for the given client.
  * XXX: kill their descendants as well?
@@ -993,113 +966,51 @@ void rpc_killall_tasks(struct rpc_clnt *clnt)
        }
        spin_unlock(&clnt->cl_lock);
 }
+EXPORT_SYMBOL_GPL(rpc_killall_tasks);
 
-void rpc_register_client(struct rpc_clnt *clnt)
+int rpciod_up(void)
 {
-       spin_lock(&rpc_sched_lock);
-       list_add(&clnt->cl_clients, &all_clients);
-       spin_unlock(&rpc_sched_lock);
+       return try_module_get(THIS_MODULE) ? 0 : -EINVAL;
 }
 
-void rpc_unregister_client(struct rpc_clnt *clnt)
+void rpciod_down(void)
 {
-       spin_lock(&rpc_sched_lock);
-       list_del(&clnt->cl_clients);
-       spin_unlock(&rpc_sched_lock);
+       module_put(THIS_MODULE);
 }
 
 /*
- * Start up the rpciod process if it's not already running.
+ * Start up the rpciod workqueue.
  */
-int
-rpciod_up(void)
+static int rpciod_start(void)
 {
        struct workqueue_struct *wq;
-       int error = 0;
-
-       if (atomic_inc_not_zero(&rpciod_users))
-               return 0;
-
-       mutex_lock(&rpciod_mutex);
 
-       /* Guard against races with rpciod_down() */
-       if (rpciod_workqueue != NULL)
-               goto out_ok;
        /*
         * Create the rpciod thread and wait for it to start.
         */
        dprintk("RPC:       creating workqueue rpciod\n");
-       error = -ENOMEM;
        wq = create_workqueue("rpciod");
-       if (wq == NULL)
-               goto out;
-
        rpciod_workqueue = wq;
-       error = 0;
-out_ok:
-       atomic_inc(&rpciod_users);
-out:
-       mutex_unlock(&rpciod_mutex);
-       return error;
+       return rpciod_workqueue != NULL;
 }
 
-void
-rpciod_down(void)
+static void rpciod_stop(void)
 {
-       if (!atomic_dec_and_test(&rpciod_users))
-               return;
+       struct workqueue_struct *wq = NULL;
 
-       mutex_lock(&rpciod_mutex);
+       if (rpciod_workqueue == NULL)
+               return;
        dprintk("RPC:       destroying workqueue rpciod\n");
 
-       if (atomic_read(&rpciod_users) == 0 && rpciod_workqueue != NULL) {
-               destroy_workqueue(rpciod_workqueue);
-               rpciod_workqueue = NULL;
-       }
-       mutex_unlock(&rpciod_mutex);
+       wq = rpciod_workqueue;
+       rpciod_workqueue = NULL;
+       destroy_workqueue(wq);
 }
 
-#ifdef RPC_DEBUG
-void rpc_show_tasks(void)
-{
-       struct rpc_clnt *clnt;
-       struct rpc_task *t;
-
-       spin_lock(&rpc_sched_lock);
-       if (list_empty(&all_clients))
-               goto out;
-       printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
-               "-rpcwait -action- ---ops--\n");
-       list_for_each_entry(clnt, &all_clients, cl_clients) {
-               if (list_empty(&clnt->cl_tasks))
-                       continue;
-               spin_lock(&clnt->cl_lock);
-               list_for_each_entry(t, &clnt->cl_tasks, tk_task) {
-                       const char *rpc_waitq = "none";
-
-                       if (RPC_IS_QUEUED(t))
-                               rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq);
-
-                       printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n",
-                               t->tk_pid,
-                               (t->tk_msg.rpc_proc ? t->tk_msg.rpc_proc->p_proc : -1),
-                               t->tk_flags, t->tk_status,
-                               t->tk_client,
-                               (t->tk_client ? t->tk_client->cl_prog : 0),
-                               t->tk_rqstp, t->tk_timeout,
-                               rpc_waitq,
-                               t->tk_action, t->tk_ops);
-               }
-               spin_unlock(&clnt->cl_lock);
-       }
-out:
-       spin_unlock(&rpc_sched_lock);
-}
-#endif
-
 void
 rpc_destroy_mempool(void)
 {
+       rpciod_stop();
        if (rpc_buffer_mempool)
                mempool_destroy(rpc_buffer_mempool);
        if (rpc_task_mempool)
@@ -1116,13 +1027,13 @@ rpc_init_mempool(void)
        rpc_task_slabp = kmem_cache_create("rpc_tasks",
                                             sizeof(struct rpc_task),
                                             0, SLAB_HWCACHE_ALIGN,
-                                            NULL, NULL);
+                                            NULL);
        if (!rpc_task_slabp)
                goto err_nomem;
        rpc_buffer_slabp = kmem_cache_create("rpc_buffers",
                                             RPC_BUFFER_MAXSIZE,
                                             0, SLAB_HWCACHE_ALIGN,
-                                            NULL, NULL);
+                                            NULL);
        if (!rpc_buffer_slabp)
                goto err_nomem;
        rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE,
@@ -1133,6 +1044,8 @@ rpc_init_mempool(void)
                                                      rpc_buffer_slabp);
        if (!rpc_buffer_mempool)
                goto err_nomem;
+       if (!rpciod_start())
+               goto err_nomem;
        return 0;
 err_nomem:
        rpc_destroy_mempool();