]> err.no Git - linux-2.6/blobdiff - net/sunrpc/sched.c
Merge with /home/shaggy/git/linus-clean/
[linux-2.6] / net / sunrpc / sched.c
index 8d6233d3248b2eefe20ce1e268f1db9008fe0e87..7415406aa1ae510639e24bc103ea42d3a862db03 100644 (file)
@@ -41,8 +41,6 @@ static mempool_t      *rpc_buffer_mempool __read_mostly;
 
 static void                    __rpc_default_timer(struct rpc_task *task);
 static void                    rpciod_killall(void);
-static void                    rpc_free(struct rpc_task *task);
-
 static void                    rpc_async_schedule(void *);
 
 /*
@@ -264,6 +262,35 @@ void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
 }
 EXPORT_SYMBOL(rpc_init_wait_queue);
 
+static int rpc_wait_bit_interruptible(void *word)
+{
+       if (signal_pending(current))
+               return -ERESTARTSYS;
+       schedule();
+       return 0;
+}
+
+/*
+ * Mark an RPC call as having completed by clearing the 'active' bit
+ */
+static inline void rpc_mark_complete_task(struct rpc_task *task)
+{
+       rpc_clear_active(task);
+       wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE);
+}
+
+/*
+ * Allow callers to wait for completion of an RPC call
+ */
+int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
+{
+       if (action == NULL)
+               action = rpc_wait_bit_interruptible;
+       return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
+                       action, TASK_INTERRUPTIBLE);
+}
+EXPORT_SYMBOL(__rpc_wait_for_completion_task);
+
 /*
  * Make an RPC task runnable.
  *
@@ -299,10 +326,7 @@ static void rpc_make_runnable(struct rpc_task *task)
 static inline void
 rpc_schedule_run(struct rpc_task *task)
 {
-       /* Don't run a child twice! */
-       if (RPC_IS_ACTIVATED(task))
-               return;
-       task->tk_active = 1;
+       rpc_set_active(task);
        rpc_make_runnable(task);
 }
 
@@ -324,8 +348,7 @@ static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
        }
 
        /* Mark the task as being activated if so needed */
-       if (!RPC_IS_ACTIVATED(task))
-               task->tk_active = 1;
+       rpc_set_active(task);
 
        __rpc_add_wait_queue(q, task);
 
@@ -554,6 +577,14 @@ __rpc_atrun(struct rpc_task *task)
        rpc_wake_up_task(task);
 }
 
+/*
+ * Helper to call task->tk_ops->rpc_call_prepare
+ */
+static void rpc_prepare_task(struct rpc_task *task)
+{
+       task->tk_ops->rpc_call_prepare(task, task->tk_calldata);
+}
+
 /*
  * Helper that calls task->tk_ops->rpc_call_done if it exists
  */
@@ -566,20 +597,11 @@ void rpc_exit_task(struct rpc_task *task)
                        WARN_ON(RPC_ASSASSINATED(task));
                        /* Always release the RPC slot and buffer memory */
                        xprt_release(task);
-                       rpc_free(task);
                }
        }
 }
 EXPORT_SYMBOL(rpc_exit_task);
 
-static int rpc_wait_bit_interruptible(void *word)
-{
-       if (signal_pending(current))
-               return -ERESTARTSYS;
-       schedule();
-       return 0;
-}
-
 /*
  * This is the RPC `scheduler' (or rather, the finite state machine).
  */
@@ -669,9 +691,9 @@ static int __rpc_execute(struct rpc_task *task)
                dprintk("RPC: %4d sync task resuming\n", task->tk_pid);
        }
 
-       dprintk("RPC: %4d exit() = %d\n", task->tk_pid, task->tk_status);
-       status = task->tk_status;
-
+       dprintk("RPC: %4d, return %d, status %d\n", task->tk_pid, status, task->tk_status);
+       /* Wake up anyone who is waiting for task completion */
+       rpc_mark_complete_task(task);
        /* Release all resources associated with the task */
        rpc_release_task(task);
        return status;
@@ -689,9 +711,7 @@ static int __rpc_execute(struct rpc_task *task)
 int
 rpc_execute(struct rpc_task *task)
 {
-       BUG_ON(task->tk_active);
-
-       task->tk_active = 1;
+       rpc_set_active(task);
        rpc_set_running(task);
        return __rpc_execute(task);
 }
@@ -701,17 +721,19 @@ static void rpc_async_schedule(void *arg)
        __rpc_execute((struct rpc_task *)arg);
 }
 
-/*
- * Allocate memory for RPC purposes.
+/**
+ * rpc_malloc - allocate an RPC buffer
+ * @task: RPC task that will use this buffer
+ * @size: requested byte size
  *
  * We try to ensure that some NFS reads and writes can always proceed
  * by using a mempool when allocating 'small' buffers.
  * In order to avoid memory starvation triggering more writebacks of
  * NFS requests, we use GFP_NOFS rather than GFP_KERNEL.
  */
-void *
-rpc_malloc(struct rpc_task *task, size_t size)
+void * rpc_malloc(struct rpc_task *task, size_t size)
 {
+       struct rpc_rqst *req = task->tk_rqstp;
        gfp_t   gfp;
 
        if (task->tk_flags & RPC_TASK_SWAPPER)
@@ -720,27 +742,33 @@ rpc_malloc(struct rpc_task *task, size_t size)
                gfp = GFP_NOFS;
 
        if (size > RPC_BUFFER_MAXSIZE) {
-               task->tk_buffer =  kmalloc(size, gfp);
-               if (task->tk_buffer)
-                       task->tk_bufsize = size;
+               req->rq_buffer = kmalloc(size, gfp);
+               if (req->rq_buffer)
+                       req->rq_bufsize = size;
        } else {
-               task->tk_buffer =  mempool_alloc(rpc_buffer_mempool, gfp);
-               if (task->tk_buffer)
-                       task->tk_bufsize = RPC_BUFFER_MAXSIZE;
+               req->rq_buffer = mempool_alloc(rpc_buffer_mempool, gfp);
+               if (req->rq_buffer)
+                       req->rq_bufsize = RPC_BUFFER_MAXSIZE;
        }
-       return task->tk_buffer;
+       return req->rq_buffer;
 }
 
-static void
-rpc_free(struct rpc_task *task)
+/**
+ * rpc_free - free buffer allocated via rpc_malloc
+ * @task: RPC task with a buffer to be freed
+ *
+ */
+void rpc_free(struct rpc_task *task)
 {
-       if (task->tk_buffer) {
-               if (task->tk_bufsize == RPC_BUFFER_MAXSIZE)
-                       mempool_free(task->tk_buffer, rpc_buffer_mempool);
+       struct rpc_rqst *req = task->tk_rqstp;
+
+       if (req->rq_buffer) {
+               if (req->rq_bufsize == RPC_BUFFER_MAXSIZE)
+                       mempool_free(req->rq_buffer, rpc_buffer_mempool);
                else
-                       kfree(task->tk_buffer);
-               task->tk_buffer = NULL;
-               task->tk_bufsize = 0;
+                       kfree(req->rq_buffer);
+               req->rq_buffer = NULL;
+               req->rq_bufsize = 0;
        }
 }
 
@@ -753,9 +781,12 @@ void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, cons
        init_timer(&task->tk_timer);
        task->tk_timer.data     = (unsigned long) task;
        task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer;
+       atomic_set(&task->tk_count, 1);
        task->tk_client = clnt;
        task->tk_flags  = flags;
        task->tk_ops = tk_ops;
+       if (tk_ops->rpc_call_prepare != NULL)
+               task->tk_action = rpc_prepare_task;
        task->tk_calldata = calldata;
 
        /* Initialize retry counters */
@@ -838,11 +869,13 @@ void rpc_release_task(struct rpc_task *task)
 {
        const struct rpc_call_ops *tk_ops = task->tk_ops;
        void *calldata = task->tk_calldata;
-       dprintk("RPC: %4d release task\n", task->tk_pid);
 
 #ifdef RPC_DEBUG
        BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
 #endif
+       if (!atomic_dec_and_test(&task->tk_count))
+               return;
+       dprintk("RPC: %4d release task\n", task->tk_pid);
 
        /* Remove from global task list */
        spin_lock(&rpc_sched_lock);
@@ -850,7 +883,6 @@ void rpc_release_task(struct rpc_task *task)
        spin_unlock(&rpc_sched_lock);
 
        BUG_ON (RPC_IS_QUEUED(task));
-       task->tk_active = 0;
 
        /* Synchronously delete any running timer */
        rpc_delete_timer(task);
@@ -860,7 +892,6 @@ void rpc_release_task(struct rpc_task *task)
                xprt_release(task);
        if (task->tk_msg.rpc_cred)
                rpcauth_unbindcred(task);
-       rpc_free(task);
        if (task->tk_client) {
                rpc_release_client(task->tk_client);
                task->tk_client = NULL;
@@ -875,6 +906,27 @@ void rpc_release_task(struct rpc_task *task)
                tk_ops->rpc_release(calldata);
 }
 
+/**
+ * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
+ * @clnt - pointer to RPC client
+ * @flags - RPC flags
+ * @ops - RPC call ops
+ * @data - user call data
+ */
+struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags,
+                                       const struct rpc_call_ops *ops,
+                                       void *data)
+{
+       struct rpc_task *task;
+       task = rpc_new_task(clnt, flags, ops, data);
+       if (task == NULL)
+               return ERR_PTR(-ENOMEM);
+       atomic_inc(&task->tk_count);
+       rpc_execute(task);
+       return task;
+}
+EXPORT_SYMBOL(rpc_run_task);
+
 /**
  * rpc_find_parent - find the parent of a child task.
  * @child: child task