/* SMP locking strategy:
*
- * svc_serv->sv_lock protects most stuff for that service.
+ * svc_pool->sp_lock protects most of the fields of that pool.
+ * svc_serv->sv_lock protects sv_tempsocks, sv_permsocks, sv_tmpcnt.
+ * when both need to be taken (rare), svc_serv->sv_lock is first.
+ * BKL protects svc_serv->sv_nrthread.
* svc_sock->sk_defer_lock protects the svc_sock->sk_deferred list
+ * svc_sock->sk_flags.SK_BUSY prevents a svc_sock being enqueued multiply.
*
* Some flags can be set to certain values at any time
* providing that certain rules are followed:
*
- * SK_BUSY can be set to 0 at any time.
- * svc_sock_enqueue must be called afterwards
* SK_CONN, SK_DATA, can be set or cleared at any time.
* after a set, svc_sock_enqueue must be called.
* after a clear, the socket must be read/accepted
static int svc_conn_age_period = 6*60;
/*
- * Queue up an idle server thread. Must have serv->sv_lock held.
+ * Queue up an idle server thread. Must have pool->sp_lock held.
* Note: this is really a stack rather than a queue, so that we only
- * use as many different threads as we need, and the rest don't polute
+ * use as many different threads as we need, and the rest don't pollute
* the cache.
*/
static inline void
-svc_serv_enqueue(struct svc_serv *serv, struct svc_rqst *rqstp)
+svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp)
{
- list_add(&rqstp->rq_list, &serv->sv_threads);
+ list_add(&rqstp->rq_list, &pool->sp_threads);
}
/*
- * Dequeue an nfsd thread. Must have serv->sv_lock held.
+ * Dequeue an nfsd thread. Must have pool->sp_lock held.
*/
static inline void
-svc_serv_dequeue(struct svc_serv *serv, struct svc_rqst *rqstp)
+svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp)
{
list_del(&rqstp->rq_list);
}
svc_sock_enqueue(struct svc_sock *svsk)
{
struct svc_serv *serv = svsk->sk_server;
+ struct svc_pool *pool;
struct svc_rqst *rqstp;
+ int cpu;
if (!(svsk->sk_flags &
( (1<<SK_CONN)|(1<<SK_DATA)|(1<<SK_CLOSE)|(1<<SK_DEFERRED)) ))
if (test_bit(SK_DEAD, &svsk->sk_flags))
return;
- spin_lock_bh(&serv->sv_lock);
+ cpu = get_cpu();
+ pool = svc_pool_for_cpu(svsk->sk_server, cpu);
+ put_cpu();
+
+ spin_lock_bh(&pool->sp_lock);
- if (!list_empty(&serv->sv_threads) &&
- !list_empty(&serv->sv_sockets))
+ if (!list_empty(&pool->sp_threads) &&
+ !list_empty(&pool->sp_sockets))
printk(KERN_ERR
"svc_sock_enqueue: threads and sockets both waiting??\n");
goto out_unlock;
}
- if (test_bit(SK_BUSY, &svsk->sk_flags)) {
- /* Don't enqueue socket while daemon is receiving */
+ /* Mark socket as busy. It will remain in this state until the
+ * server has processed all pending data and put the socket back
+ * on the idle list. We update SK_BUSY atomically because
+ * it also guards against trying to enqueue the svc_sock twice.
+ */
+ if (test_and_set_bit(SK_BUSY, &svsk->sk_flags)) {
+ /* Don't enqueue socket while already enqueued */
dprintk("svc: socket %p busy, not enqueued\n", svsk->sk_sk);
goto out_unlock;
}
+ BUG_ON(svsk->sk_pool != NULL);
+ svsk->sk_pool = pool;
set_bit(SOCK_NOSPACE, &svsk->sk_sock->flags);
- if (((svsk->sk_reserved + serv->sv_bufsz)*2
+ if (((atomic_read(&svsk->sk_reserved) + serv->sv_bufsz)*2
> svc_sock_wspace(svsk))
&& !test_bit(SK_CLOSE, &svsk->sk_flags)
&& !test_bit(SK_CONN, &svsk->sk_flags)) {
/* Don't enqueue while not enough space for reply */
dprintk("svc: socket %p no space, %d*2 > %ld, not enqueued\n",
- svsk->sk_sk, svsk->sk_reserved+serv->sv_bufsz,
+ svsk->sk_sk, atomic_read(&svsk->sk_reserved)+serv->sv_bufsz,
svc_sock_wspace(svsk));
+ svsk->sk_pool = NULL;
+ clear_bit(SK_BUSY, &svsk->sk_flags);
goto out_unlock;
}
clear_bit(SOCK_NOSPACE, &svsk->sk_sock->flags);
- /* Mark socket as busy. It will remain in this state until the
- * server has processed all pending data and put the socket back
- * on the idle list.
- */
- set_bit(SK_BUSY, &svsk->sk_flags);
- if (!list_empty(&serv->sv_threads)) {
- rqstp = list_entry(serv->sv_threads.next,
+ if (!list_empty(&pool->sp_threads)) {
+ rqstp = list_entry(pool->sp_threads.next,
struct svc_rqst,
rq_list);
dprintk("svc: socket %p served by daemon %p\n",
svsk->sk_sk, rqstp);
- svc_serv_dequeue(serv, rqstp);
+ svc_thread_dequeue(pool, rqstp);
if (rqstp->rq_sock)
printk(KERN_ERR
"svc_sock_enqueue: server %p, rq_sock=%p!\n",
rqstp->rq_sock = svsk;
atomic_inc(&svsk->sk_inuse);
rqstp->rq_reserved = serv->sv_bufsz;
- svsk->sk_reserved += rqstp->rq_reserved;
+ atomic_add(rqstp->rq_reserved, &svsk->sk_reserved);
+ BUG_ON(svsk->sk_pool != pool);
wake_up(&rqstp->rq_wait);
} else {
dprintk("svc: socket %p put into queue\n", svsk->sk_sk);
- list_add_tail(&svsk->sk_ready, &serv->sv_sockets);
+ list_add_tail(&svsk->sk_ready, &pool->sp_sockets);
+ BUG_ON(svsk->sk_pool != pool);
}
out_unlock:
- spin_unlock_bh(&serv->sv_lock);
+ spin_unlock_bh(&pool->sp_lock);
}
/*
- * Dequeue the first socket. Must be called with the serv->sv_lock held.
+ * Dequeue the first socket. Must be called with the pool->sp_lock held.
*/
static inline struct svc_sock *
-svc_sock_dequeue(struct svc_serv *serv)
+svc_sock_dequeue(struct svc_pool *pool)
{
struct svc_sock *svsk;
- if (list_empty(&serv->sv_sockets))
+ if (list_empty(&pool->sp_sockets))
return NULL;
- svsk = list_entry(serv->sv_sockets.next,
+ svsk = list_entry(pool->sp_sockets.next,
struct svc_sock, sk_ready);
list_del_init(&svsk->sk_ready);
static inline void
svc_sock_received(struct svc_sock *svsk)
{
+ svsk->sk_pool = NULL;
clear_bit(SK_BUSY, &svsk->sk_flags);
svc_sock_enqueue(svsk);
}
if (space < rqstp->rq_reserved) {
struct svc_sock *svsk = rqstp->rq_sock;
- spin_lock_bh(&svsk->sk_server->sv_lock);
- svsk->sk_reserved -= (rqstp->rq_reserved - space);
+ atomic_sub((rqstp->rq_reserved - space), &svsk->sk_reserved);
rqstp->rq_reserved = space;
- spin_unlock_bh(&svsk->sk_server->sv_lock);
svc_sock_enqueue(svsk);
}
svc_release_skb(rqstp);
- svc_free_allpages(rqstp);
+ svc_free_res_pages(rqstp);
rqstp->rq_res.page_len = 0;
rqstp->rq_res.page_base = 0;
/*
* External function to wake up a server waiting for data
+ * This really only makes sense for services like lockd
+ * which have exactly one thread anyway.
*/
void
svc_wake_up(struct svc_serv *serv)
{
struct svc_rqst *rqstp;
-
- spin_lock_bh(&serv->sv_lock);
- if (!list_empty(&serv->sv_threads)) {
- rqstp = list_entry(serv->sv_threads.next,
- struct svc_rqst,
- rq_list);
- dprintk("svc: daemon %p woken up.\n", rqstp);
- /*
- svc_serv_dequeue(serv, rqstp);
- rqstp->rq_sock = NULL;
- */
- wake_up(&rqstp->rq_wait);
+ unsigned int i;
+ struct svc_pool *pool;
+
+ for (i = 0; i < serv->sv_nrpools; i++) {
+ pool = &serv->sv_pools[i];
+
+ spin_lock_bh(&pool->sp_lock);
+ if (!list_empty(&pool->sp_threads)) {
+ rqstp = list_entry(pool->sp_threads.next,
+ struct svc_rqst,
+ rq_list);
+ dprintk("svc: daemon %p woken up.\n", rqstp);
+ /*
+ svc_thread_dequeue(pool, rqstp);
+ rqstp->rq_sock = NULL;
+ */
+ wake_up(&rqstp->rq_wait);
+ }
+ spin_unlock_bh(&pool->sp_lock);
}
- spin_unlock_bh(&serv->sv_lock);
}
/*
/* send head */
if (slen == xdr->head[0].iov_len)
flags = 0;
- len = kernel_sendpage(sock, rqstp->rq_respages[0], 0, xdr->head[0].iov_len, flags);
+ len = kernel_sendpage(sock, rqstp->rq_respages[0], 0,
+ xdr->head[0].iov_len, flags);
if (len != xdr->head[0].iov_len)
goto out;
slen -= xdr->head[0].iov_len;
}
/* send tail */
if (xdr->tail[0].iov_len) {
- result = kernel_sendpage(sock, rqstp->rq_respages[rqstp->rq_restailpage],
- ((unsigned long)xdr->tail[0].iov_base)& (PAGE_SIZE-1),
+ result = kernel_sendpage(sock, rqstp->rq_respages[0],
+ ((unsigned long)xdr->tail[0].iov_base)
+ & (PAGE_SIZE-1),
xdr->tail[0].iov_len, 0);
if (result > 0)
}
spin_unlock(&serv->sv_lock);
if (closesk)
+ /* Should unregister with portmap, but you cannot
+ * unregister just one protocol...
+ */
svc_delete_socket(closesk);
+ else if (toclose)
+ return -ENOENT;
return len;
}
EXPORT_SYMBOL(svc_sock_names);
/* udp sockets need large rcvbuf as all pending
* requests are still in that buffer. sndbuf must
* also be large enough that there is enough space
- * for one reply per thread.
+ * for one reply per thread. We count all threads
+ * rather than threads in a particular pool, which
+ * provides an upper bound on the number of threads
+ * which will access the socket.
*/
svc_sock_setbufsize(svsk->sk_sock,
(serv->sv_nrthreads+3) * serv->sv_bufsz,
if (len <= rqstp->rq_arg.head[0].iov_len) {
rqstp->rq_arg.head[0].iov_len = len;
rqstp->rq_arg.page_len = 0;
+ rqstp->rq_respages = rqstp->rq_pages+1;
} else {
rqstp->rq_arg.page_len = len - rqstp->rq_arg.head[0].iov_len;
- rqstp->rq_argused += (rqstp->rq_arg.page_len + PAGE_SIZE - 1)/ PAGE_SIZE;
+ rqstp->rq_respages = rqstp->rq_pages + 1 +
+ (rqstp->rq_arg.page_len + PAGE_SIZE - 1)/ PAGE_SIZE;
}
if (serv->sv_stats)
struct svc_sock *svsk = rqstp->rq_sock;
struct svc_serv *serv = svsk->sk_server;
int len;
- struct kvec vec[RPCSVC_MAXPAGES];
+ struct kvec *vec;
int pnum, vlen;
dprintk("svc: tcp_recv %p data %d conn %d close %d\n",
/* sndbuf needs to have room for one request
* per thread, otherwise we can stall even when the
* network isn't a bottleneck.
+ *
+ * We count all threads rather than threads in a
+ * particular pool, which provides an upper bound
+ * on the number of threads which will access the socket.
+ *
* rcvbuf just needs to be able to hold a few requests.
* Normally they will be removed from the queue
* as soon a a complete request arrives.
len = svsk->sk_reclen;
set_bit(SK_DATA, &svsk->sk_flags);
+ vec = rqstp->rq_vec;
vec[0] = rqstp->rq_arg.head[0];
vlen = PAGE_SIZE;
pnum = 1;
while (vlen < len) {
- vec[pnum].iov_base = page_address(rqstp->rq_argpages[rqstp->rq_argused++]);
+ vec[pnum].iov_base = page_address(rqstp->rq_pages[pnum]);
vec[pnum].iov_len = PAGE_SIZE;
pnum++;
vlen += PAGE_SIZE;
}
+ rqstp->rq_respages = &rqstp->rq_pages[pnum];
/* Now receive data */
len = svc_recvfrom(rqstp, vec, pnum, len);
}
/*
- * Receive the next request on any socket.
+ * Receive the next request on any socket. This code is carefully
+ * organised not to touch any cachelines in the shared svc_serv
+ * structure, only cachelines in the local svc_pool.
*/
int
svc_recv(struct svc_rqst *rqstp, long timeout)
{
struct svc_sock *svsk =NULL;
struct svc_serv *serv = rqstp->rq_server;
- int len;
+ struct svc_pool *pool = rqstp->rq_pool;
+ int len, i;
int pages;
struct xdr_buf *arg;
DECLARE_WAITQUEUE(wait, current);
"svc_recv: service %p, wait queue active!\n",
rqstp);
- /* Initialize the buffers */
- /* first reclaim pages that were moved to response list */
- svc_pushback_allpages(rqstp);
/* now allocate needed pages. If we get a failure, sleep briefly */
pages = 2 + (serv->sv_bufsz + PAGE_SIZE -1) / PAGE_SIZE;
- while (rqstp->rq_arghi < pages) {
- struct page *p = alloc_page(GFP_KERNEL);
- if (!p) {
- schedule_timeout_uninterruptible(msecs_to_jiffies(500));
- continue;
+ for (i=0; i < pages ; i++)
+ while (rqstp->rq_pages[i] == NULL) {
+ struct page *p = alloc_page(GFP_KERNEL);
+ if (!p)
+ schedule_timeout_uninterruptible(msecs_to_jiffies(500));
+ rqstp->rq_pages[i] = p;
}
- rqstp->rq_argpages[rqstp->rq_arghi++] = p;
- }
/* Make arg->head point to first page and arg->pages point to rest */
arg = &rqstp->rq_arg;
- arg->head[0].iov_base = page_address(rqstp->rq_argpages[0]);
+ arg->head[0].iov_base = page_address(rqstp->rq_pages[0]);
arg->head[0].iov_len = PAGE_SIZE;
- rqstp->rq_argused = 1;
- arg->pages = rqstp->rq_argpages + 1;
+ arg->pages = rqstp->rq_pages + 1;
arg->page_base = 0;
/* save at least one page for response */
arg->page_len = (pages-2)*PAGE_SIZE;
if (signalled())
return -EINTR;
- spin_lock_bh(&serv->sv_lock);
- if ((svsk = svc_sock_dequeue(serv)) != NULL) {
+ spin_lock_bh(&pool->sp_lock);
+ if ((svsk = svc_sock_dequeue(pool)) != NULL) {
rqstp->rq_sock = svsk;
atomic_inc(&svsk->sk_inuse);
rqstp->rq_reserved = serv->sv_bufsz;
- svsk->sk_reserved += rqstp->rq_reserved;
+ atomic_add(rqstp->rq_reserved, &svsk->sk_reserved);
} else {
/* No data pending. Go to sleep */
- svc_serv_enqueue(serv, rqstp);
+ svc_thread_enqueue(pool, rqstp);
/*
* We have to be able to interrupt this wait
*/
set_current_state(TASK_INTERRUPTIBLE);
add_wait_queue(&rqstp->rq_wait, &wait);
- spin_unlock_bh(&serv->sv_lock);
+ spin_unlock_bh(&pool->sp_lock);
schedule_timeout(timeout);
try_to_freeze();
- spin_lock_bh(&serv->sv_lock);
+ spin_lock_bh(&pool->sp_lock);
remove_wait_queue(&rqstp->rq_wait, &wait);
if (!(svsk = rqstp->rq_sock)) {
- svc_serv_dequeue(serv, rqstp);
- spin_unlock_bh(&serv->sv_lock);
+ svc_thread_dequeue(pool, rqstp);
+ spin_unlock_bh(&pool->sp_lock);
dprintk("svc: server %p, no data yet\n", rqstp);
return signalled()? -EINTR : -EAGAIN;
}
}
- spin_unlock_bh(&serv->sv_lock);
+ spin_unlock_bh(&pool->sp_lock);
- dprintk("svc: server %p, socket %p, inuse=%d\n",
- rqstp, svsk, atomic_read(&svsk->sk_inuse));
+ dprintk("svc: server %p, pool %u, socket %p, inuse=%d\n",
+ rqstp, pool->sp_id, svsk, atomic_read(&svsk->sk_inuse));
len = svsk->sk_recvfrom(rqstp);
dprintk("svc: got len=%d\n", len);
if (!test_and_set_bit(SK_DETACHED, &svsk->sk_flags))
list_del_init(&svsk->sk_list);
- list_del_init(&svsk->sk_ready);
+ /*
+ * We used to delete the svc_sock from whichever list
+ * it's sk_ready node was on, but we don't actually
+ * need to. This is because the only time we're called
+ * while still attached to a queue, the queue itself
+ * is about to be destroyed (in svc_destroy).
+ */
if (!test_and_set_bit(SK_DEAD, &svsk->sk_flags))
if (test_bit(SK_TEMP, &svsk->sk_flags))
serv->sv_tmpcnt--;
sockfd_put(svsk->sk_sock);
else
sock_release(svsk->sk_sock);
+ if (svsk->sk_info_authunix != NULL)
+ svcauth_unix_info_release(svsk->sk_info_authunix);
kfree(svsk);
} else {
spin_unlock_bh(&serv->sv_lock);
rqstp->rq_prot = dr->prot;
rqstp->rq_addr = dr->addr;
rqstp->rq_daddr = dr->daddr;
+ rqstp->rq_respages = rqstp->rq_pages;
return dr->argslen<<2;
}