/*
* RPC tasks sit here while waiting for conditions to improve.
*/
-static RPC_WAITQ(delay_queue, "delayq");
+static struct rpc_wait_queue delay_queue;
/*
* rpciod-related stuff
*/
-static DEFINE_MUTEX(rpciod_mutex);
-static atomic_t rpciod_users = ATOMIC_INIT(0);
struct workqueue_struct *rpciod_workqueue;
/*
if (unlikely(task->tk_priority > queue->maxpriority))
q = &queue->tasks[queue->maxpriority];
list_for_each_entry(t, q, u.tk_wait.list) {
- if (t->tk_cookie == task->tk_cookie) {
+ if (t->tk_owner == task->tk_owner) {
list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links);
return;
}
queue->count = 1 << (priority * 2);
}
-static inline void rpc_set_waitqueue_cookie(struct rpc_wait_queue *queue, unsigned long cookie)
+static inline void rpc_set_waitqueue_owner(struct rpc_wait_queue *queue, pid_t pid)
{
- queue->cookie = cookie;
+ queue->owner = pid;
queue->nr = RPC_BATCH_COUNT;
}
static inline void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue)
{
rpc_set_waitqueue_priority(queue, queue->maxpriority);
- rpc_set_waitqueue_cookie(queue, 0);
+ rpc_set_waitqueue_owner(queue, 0);
}
-static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, int maxprio)
+static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, unsigned char nr_queues)
{
int i;
spin_lock_init(&queue->lock);
for (i = 0; i < ARRAY_SIZE(queue->tasks); i++)
INIT_LIST_HEAD(&queue->tasks[i]);
- queue->maxpriority = maxprio;
+ queue->maxpriority = nr_queues - 1;
rpc_reset_waitqueue_priority(queue);
#ifdef RPC_DEBUG
queue->name = qname;
void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname)
{
- __rpc_init_priority_wait_queue(queue, qname, RPC_PRIORITY_HIGH);
+ __rpc_init_priority_wait_queue(queue, qname, RPC_NR_PRIORITY);
}
void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
{
- __rpc_init_priority_wait_queue(queue, qname, 0);
+ __rpc_init_priority_wait_queue(queue, qname, 1);
}
-EXPORT_SYMBOL(rpc_init_wait_queue);
+EXPORT_SYMBOL_GPL(rpc_init_wait_queue);
-static int rpc_wait_bit_interruptible(void *word)
+static int rpc_wait_bit_killable(void *word)
{
- if (signal_pending(current))
+ if (fatal_signal_pending(current))
return -ERESTARTSYS;
schedule();
return 0;
int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
{
if (action == NULL)
- action = rpc_wait_bit_interruptible;
+ action = rpc_wait_bit_killable;
return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
- action, TASK_INTERRUPTIBLE);
+ action, TASK_KILLABLE);
}
-EXPORT_SYMBOL(__rpc_wait_for_completion_task);
+EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
/*
* Make an RPC task runnable.
__rpc_sleep_on(q, task, action, timer);
spin_unlock_bh(&q->lock);
}
+EXPORT_SYMBOL_GPL(rpc_sleep_on);
/**
* __rpc_do_wake_up_task - wake up a single rpc_task
}
rcu_read_unlock_bh();
}
+EXPORT_SYMBOL_GPL(rpc_wake_up_task);
/*
* Wake up the next task on a priority queue.
struct rpc_task *task;
/*
- * Service a batch of tasks from a single cookie.
+ * Service a batch of tasks from a single owner.
*/
q = &queue->tasks[queue->priority];
if (!list_empty(q)) {
task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
- if (queue->cookie == task->tk_cookie) {
+ if (queue->owner == task->tk_owner) {
if (--queue->nr)
goto out;
list_move_tail(&task->u.tk_wait.list, q);
* Check if we need to switch queues.
*/
if (--queue->count)
- goto new_cookie;
+ goto new_owner;
}
/*
new_queue:
rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0]));
-new_cookie:
- rpc_set_waitqueue_cookie(queue, task->tk_cookie);
+new_owner:
+ rpc_set_waitqueue_owner(queue, task->tk_owner);
out:
__rpc_wake_up_task(task);
return task;
return task;
}
+EXPORT_SYMBOL_GPL(rpc_wake_up_next);
/**
* rpc_wake_up - wake up all rpc_tasks
spin_unlock(&queue->lock);
rcu_read_unlock_bh();
}
+EXPORT_SYMBOL_GPL(rpc_wake_up);
/**
* rpc_wake_up_status - wake up all rpc_tasks and set their status value.
spin_unlock(&queue->lock);
rcu_read_unlock_bh();
}
+EXPORT_SYMBOL_GPL(rpc_wake_up_status);
static void __rpc_atrun(struct rpc_task *task)
{
task->tk_timeout = delay;
rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun);
}
+EXPORT_SYMBOL_GPL(rpc_delay);
/*
* Helper to call task->tk_ops->rpc_call_prepare
}
}
}
-EXPORT_SYMBOL(rpc_exit_task);
+EXPORT_SYMBOL_GPL(rpc_exit_task);
void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata)
{
/* sync task: sleep here */
dprintk("RPC: %5u sync task going to sleep\n", task->tk_pid);
- /* Note: Caller should be using rpc_clnt_sigmask() */
status = out_of_line_wait_on_bit(&task->tk_runstate,
- RPC_TASK_QUEUED, rpc_wait_bit_interruptible,
- TASK_INTERRUPTIBLE);
+ RPC_TASK_QUEUED, rpc_wait_bit_killable,
+ TASK_KILLABLE);
if (status == -ERESTARTSYS) {
/*
* When a sync task receives a signal, it exits with
task->tk_pid, size, buf);
return &buf->data;
}
+EXPORT_SYMBOL_GPL(rpc_malloc);
/**
* rpc_free - free buffer allocated via rpc_malloc
else
kfree(buf);
}
+EXPORT_SYMBOL_GPL(rpc_free);
/*
* Creation and deletion of RPC task structures
*/
-void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata)
+static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *task_setup_data)
{
memset(task, 0, sizeof(*task));
- init_timer(&task->tk_timer);
- task->tk_timer.data = (unsigned long) task;
- task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer;
+ setup_timer(&task->tk_timer, (void (*)(unsigned long))rpc_run_timer,
+ (unsigned long)task);
atomic_set(&task->tk_count, 1);
- task->tk_client = clnt;
- task->tk_flags = flags;
- task->tk_ops = tk_ops;
- if (tk_ops->rpc_call_prepare != NULL)
- task->tk_action = rpc_prepare_task;
- task->tk_calldata = calldata;
+ task->tk_flags = task_setup_data->flags;
+ task->tk_ops = task_setup_data->callback_ops;
+ task->tk_calldata = task_setup_data->callback_data;
INIT_LIST_HEAD(&task->tk_task);
/* Initialize retry counters */
task->tk_garb_retry = 2;
task->tk_cred_retry = 2;
- task->tk_priority = RPC_PRIORITY_NORMAL;
- task->tk_cookie = (unsigned long)current;
+ task->tk_priority = task_setup_data->priority - RPC_PRIORITY_LOW;
+ task->tk_owner = current->tgid;
/* Initialize workqueue for async tasks */
task->tk_workqueue = rpciod_workqueue;
- if (clnt) {
- kref_get(&clnt->cl_kref);
- if (clnt->cl_softrtry)
+ task->tk_client = task_setup_data->rpc_client;
+ if (task->tk_client != NULL) {
+ kref_get(&task->tk_client->cl_kref);
+ if (task->tk_client->cl_softrtry)
task->tk_flags |= RPC_TASK_SOFT;
- if (!clnt->cl_intr)
- task->tk_flags |= RPC_TASK_NOINTR;
}
- BUG_ON(task->tk_ops == NULL);
+ if (task->tk_ops->rpc_call_prepare != NULL)
+ task->tk_action = rpc_prepare_task;
+
+ if (task_setup_data->rpc_message != NULL) {
+ memcpy(&task->tk_msg, task_setup_data->rpc_message, sizeof(task->tk_msg));
+ /* Bind the user cred */
+ if (task->tk_msg.rpc_cred != NULL)
+ rpcauth_holdcred(task);
+ else
+ rpcauth_bindcred(task);
+ if (task->tk_action == NULL)
+ rpc_call_start(task);
+ }
/* starting timestamp */
task->tk_start = jiffies;
dprintk("RPC: new task initialized, procpid %u\n",
- current->pid);
+ task_pid_nr(current));
}
static struct rpc_task *
/*
* Create a new task for the specified client.
*/
-struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata)
+struct rpc_task *rpc_new_task(const struct rpc_task_setup *setup_data)
{
- struct rpc_task *task;
-
- task = rpc_alloc_task();
- if (!task)
- goto out;
+ struct rpc_task *task = setup_data->task;
+ unsigned short flags = 0;
+
+ if (task == NULL) {
+ task = rpc_alloc_task();
+ if (task == NULL)
+ goto out;
+ flags = RPC_TASK_DYNAMIC;
+ }
- rpc_init_task(task, clnt, flags, tk_ops, calldata);
+ rpc_init_task(task, setup_data);
+ task->tk_flags |= flags;
dprintk("RPC: allocated task %p\n", task);
- task->tk_flags |= RPC_TASK_DYNAMIC;
out:
return task;
}
call_rcu_bh(&task->u.tk_rcu, rpc_free_task);
rpc_release_calldata(tk_ops, calldata);
}
-EXPORT_SYMBOL(rpc_put_task);
+EXPORT_SYMBOL_GPL(rpc_put_task);
static void rpc_release_task(struct rpc_task *task)
{
}
spin_unlock(&clnt->cl_lock);
}
+EXPORT_SYMBOL_GPL(rpc_killall_tasks);
+
+int rpciod_up(void)
+{
+ return try_module_get(THIS_MODULE) ? 0 : -EINVAL;
+}
+
+void rpciod_down(void)
+{
+ module_put(THIS_MODULE);
+}
/*
- * Start up the rpciod process if it's not already running.
+ * Start up the rpciod workqueue.
*/
-int
-rpciod_up(void)
+static int rpciod_start(void)
{
struct workqueue_struct *wq;
- int error = 0;
-
- if (atomic_inc_not_zero(&rpciod_users))
- return 0;
-
- mutex_lock(&rpciod_mutex);
- /* Guard against races with rpciod_down() */
- if (rpciod_workqueue != NULL)
- goto out_ok;
/*
* Create the rpciod thread and wait for it to start.
*/
dprintk("RPC: creating workqueue rpciod\n");
- error = -ENOMEM;
wq = create_workqueue("rpciod");
- if (wq == NULL)
- goto out;
-
rpciod_workqueue = wq;
- error = 0;
-out_ok:
- atomic_inc(&rpciod_users);
-out:
- mutex_unlock(&rpciod_mutex);
- return error;
+ return rpciod_workqueue != NULL;
}
-void
-rpciod_down(void)
+static void rpciod_stop(void)
{
- if (!atomic_dec_and_test(&rpciod_users))
- return;
+ struct workqueue_struct *wq = NULL;
- mutex_lock(&rpciod_mutex);
+ if (rpciod_workqueue == NULL)
+ return;
dprintk("RPC: destroying workqueue rpciod\n");
- if (atomic_read(&rpciod_users) == 0 && rpciod_workqueue != NULL) {
- destroy_workqueue(rpciod_workqueue);
- rpciod_workqueue = NULL;
- }
- mutex_unlock(&rpciod_mutex);
+ wq = rpciod_workqueue;
+ rpciod_workqueue = NULL;
+ destroy_workqueue(wq);
}
void
rpc_destroy_mempool(void)
{
+ rpciod_stop();
if (rpc_buffer_mempool)
mempool_destroy(rpc_buffer_mempool);
if (rpc_task_mempool)
rpc_buffer_slabp);
if (!rpc_buffer_mempool)
goto err_nomem;
+ if (!rpciod_start())
+ goto err_nomem;
+ /*
+ * The following is not strictly a mempool initialisation,
+ * but there is no harm in doing it here
+ */
+ rpc_init_wait_queue(&delay_queue, "delayq");
return 0;
err_nomem:
rpc_destroy_mempool();