]> err.no Git - linux-2.6/blobdiff - net/sunrpc/xprtsock.c
Merge git://git.kernel.org/pub/scm/linux/kernel/git/sam/kbuild
[linux-2.6] / net / sunrpc / xprtsock.c
index fc4fbe8ea346d2554cbf5bd87b079ddd84d3deed..c458f8d1d6d1090e193c1c1cd66a4c7552c92986 100644 (file)
@@ -28,6 +28,7 @@
 #include <linux/udp.h>
 #include <linux/tcp.h>
 #include <linux/sunrpc/clnt.h>
+#include <linux/sunrpc/sched.h>
 #include <linux/file.h>
 
 #include <net/sock.h>
 #include <net/tcp.h>
 
 /*
- * Maximum port number to use when requesting a reserved port.
+ * xprtsock tunables
  */
-#define XS_MAX_RESVPORT                (800U)
+unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
+unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;
+
+unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
+unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
 
 /*
  * How many times to try sending a request on a socket before waiting
  */
 #define XS_SENDMSG_RETRY       (10U)
 
+/*
+ * Time out for an RPC UDP socket connect.  UDP socket connects are
+ * synchronous, but we set a timeout anyway in case of resource
+ * exhaustion on the local host.
+ */
+#define XS_UDP_CONN_TO         (5U * HZ)
+
+/*
+ * Wait duration for an RPC TCP connection to be established.  Solaris
+ * NFS over TCP uses 60 seconds, for example, which is in line with how
+ * long a server takes to reboot.
+ */
+#define XS_TCP_CONN_TO         (60U * HZ)
+
+/*
+ * Wait duration for a reply from the RPC portmapper.
+ */
+#define XS_BIND_TO             (60U * HZ)
+
+/*
+ * Delay if a UDP socket connect error occurs.  This is most likely some
+ * kind of resource problem on the local host.
+ */
+#define XS_UDP_REEST_TO                (2U * HZ)
+
+/*
+ * The reestablish timeout allows clients to delay for a bit before attempting
+ * to reconnect to a server that just dropped our connection.
+ *
+ * We implement an exponential backoff when trying to reestablish a TCP
+ * transport connection with the server.  Some servers like to drop a TCP
+ * connection when they are overworked, so we start with a short timeout and
+ * increase over time if the server is down or not responding.
+ */
+#define XS_TCP_INIT_REEST_TO   (3U * HZ)
+#define XS_TCP_MAX_REEST_TO    (5U * 60 * HZ)
+
+/*
+ * TCP idle timeout; client drops the transport socket if it is idle
+ * for this long.  Note that we also timeout UDP sockets to prevent
+ * holding port numbers when there is no RPC traffic.
+ */
+#define XS_IDLE_DISC_TO                (5U * 60 * HZ)
+
 #ifdef RPC_DEBUG
 # undef  RPC_DEBUG_DATA
 # define RPCDBG_FACILITY       RPCDBG_TRANS
@@ -367,6 +416,8 @@ static int xs_tcp_send_request(struct rpc_task *task)
  * xs_close - close a socket
  * @xprt: transport
  *
+ * This is used when all requests are complete; ie, no DRC state remains
+ * on the server we want to save.
  */
 static void xs_close(struct rpc_xprt *xprt)
 {
@@ -374,7 +425,7 @@ static void xs_close(struct rpc_xprt *xprt)
        struct sock *sk = xprt->inet;
 
        if (!sk)
-               return;
+               goto clear_close_wait;
 
        dprintk("RPC:      xs_close xprt %p\n", xprt);
 
@@ -391,6 +442,10 @@ static void xs_close(struct rpc_xprt *xprt)
        sk->sk_no_check = 0;
 
        sock_release(sock);
+clear_close_wait:
+       smp_mb__before_clear_bit();
+       clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
+       smp_mb__after_clear_bit();
 }
 
 /**
@@ -460,8 +515,6 @@ static void xs_udp_data_ready(struct sock *sk, int len)
                goto out_unlock;
        task = rovr->rq_task;
 
-       dprintk("RPC: %4d received reply\n", task->tk_pid);
-
        if ((copied = rovr->rq_private_buf.buflen) > repsize)
                copied = repsize;
 
@@ -472,7 +525,9 @@ static void xs_udp_data_ready(struct sock *sk, int len)
        /* Something worked... */
        dst_confirm(skb->dst);
 
-       xprt_complete_rqst(xprt, rovr, copied);
+       xprt_adjust_cwnd(task, copied);
+       xprt_update_rtt(task);
+       xprt_complete_rqst(task, copied);
 
  out_unlock:
        spin_unlock(&xprt->transport_lock);
@@ -634,11 +689,8 @@ static inline void xs_tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc
        }
 
 out:
-       if (!(xprt->tcp_flags & XPRT_COPY_DATA)) {
-               dprintk("RPC: %4d received reply complete\n",
-                               req->rq_task->tk_pid);
-               xprt_complete_rqst(xprt, req, xprt->tcp_copied);
-       }
+       if (!(xprt->tcp_flags & XPRT_COPY_DATA))
+               xprt_complete_rqst(req->rq_task, xprt->tcp_copied);
        spin_unlock(&xprt->transport_lock);
        xs_tcp_check_recm(xprt);
 }
@@ -745,6 +797,7 @@ static void xs_tcp_state_change(struct sock *sk)
                        xprt->tcp_reclen = 0;
                        xprt->tcp_copied = 0;
                        xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID;
+                       xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
                        xprt_wake_pending_tasks(xprt, 0);
                }
                spin_unlock_bh(&xprt->transport_lock);
@@ -752,9 +805,13 @@ static void xs_tcp_state_change(struct sock *sk)
        case TCP_SYN_SENT:
        case TCP_SYN_RECV:
                break;
+       case TCP_CLOSE_WAIT:
+               /* Try to schedule an autoclose RPC calls */
+               set_bit(XPRT_CLOSE_WAIT, &xprt->state);
+               if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
+                       schedule_work(&xprt->task_cleanup);
        default:
                xprt_disconnect(xprt);
-               break;
        }
  out:
        read_unlock(&sk->sk_callback_lock);
@@ -826,15 +883,7 @@ static void xs_tcp_write_space(struct sock *sk)
        read_unlock(&sk->sk_callback_lock);
 }
 
-/**
- * xs_udp_set_buffer_size - set send and receive limits
- * @xprt: generic transport
- *
- * Set socket send and receive limits based on the
- * sndsize and rcvsize fields in the generic transport
- * structure.
- */
-static void xs_udp_set_buffer_size(struct rpc_xprt *xprt)
+static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt)
 {
        struct sock *sk = xprt->inet;
 
@@ -850,14 +899,46 @@ static void xs_udp_set_buffer_size(struct rpc_xprt *xprt)
 }
 
 /**
- * xs_tcp_set_buffer_size - set send and receive limits
+ * xs_udp_set_buffer_size - set send and receive limits
+ * @xprt: generic transport
+ * @sndsize: requested size of send buffer, in bytes
+ * @rcvsize: requested size of receive buffer, in bytes
+ *
+ * Set socket send and receive buffer size limits.
+ */
+static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
+{
+       xprt->sndsize = 0;
+       if (sndsize)
+               xprt->sndsize = sndsize + 1024;
+       xprt->rcvsize = 0;
+       if (rcvsize)
+               xprt->rcvsize = rcvsize + 1024;
+
+       xs_udp_do_set_buffer_size(xprt);
+}
+
+/**
+ * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
+ * @task: task that timed out
+ *
+ * Adjust the congestion window after a retransmit timeout has occurred.
+ */
+static void xs_udp_timer(struct rpc_task *task)
+{
+       xprt_adjust_cwnd(task, -ETIMEDOUT);
+}
+
+/**
+ * xs_set_port - reset the port number in the remote endpoint address
  * @xprt: generic transport
+ * @port: new port number
  *
- * Nothing to do for TCP.
  */
-static void xs_tcp_set_buffer_size(struct rpc_xprt *xprt)
+static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
 {
-       return;
+       dprintk("RPC:      setting port for xprt %p to %u\n", xprt, port);
+       xprt->addr.sin_port = htons(port);
 }
 
 static int xs_bindresvport(struct rpc_xprt *xprt, struct socket *sock)
@@ -865,10 +946,9 @@ static int xs_bindresvport(struct rpc_xprt *xprt, struct socket *sock)
        struct sockaddr_in myaddr = {
                .sin_family = AF_INET,
        };
-       int err, port;
+       int err;
+       unsigned short port = xprt->port;
 
-       /* Were we already bound to a given port? Try to reuse it */
-       port = xprt->port;
        do {
                myaddr.sin_port = htons(port);
                err = sock->ops->bind(sock, (struct sockaddr *) &myaddr,
@@ -879,8 +959,10 @@ static int xs_bindresvport(struct rpc_xprt *xprt, struct socket *sock)
                                        port);
                        return 0;
                }
-               if (--port == 0)
-                       port = XS_MAX_RESVPORT;
+               if (port <= xprt_min_resvport)
+                       port = xprt_max_resvport;
+               else
+                       port--;
        } while (err == -EADDRINUSE && port != xprt->port);
 
        dprintk("RPC:      can't bind to reserved port (%d).\n", -err);
@@ -929,6 +1011,7 @@ static void xs_udp_connect_worker(void *args)
                sk->sk_data_ready = xs_udp_data_ready;
                sk->sk_write_space = xs_udp_write_space;
                sk->sk_no_check = UDP_CSUM_NORCV;
+               sk->sk_allocation = GFP_ATOMIC;
 
                xprt_set_connected(xprt);
 
@@ -938,13 +1021,37 @@ static void xs_udp_connect_worker(void *args)
 
                write_unlock_bh(&sk->sk_callback_lock);
        }
-       xs_udp_set_buffer_size(xprt);
+       xs_udp_do_set_buffer_size(xprt);
        status = 0;
 out:
        xprt_wake_pending_tasks(xprt, status);
        xprt_clear_connecting(xprt);
 }
 
+/*
+ * We need to preserve the port number so the reply cache on the server can
+ * find our cached RPC replies when we get around to reconnecting.
+ */
+static void xs_tcp_reuse_connection(struct rpc_xprt *xprt)
+{
+       int result;
+       struct socket *sock = xprt->sock;
+       struct sockaddr any;
+
+       dprintk("RPC:      disconnecting xprt %p to reuse port\n", xprt);
+
+       /*
+        * Disconnect the transport socket by doing a connect operation
+        * with AF_UNSPEC.  This should return immediately...
+        */
+       memset(&any, 0, sizeof(any));
+       any.sa_family = AF_UNSPEC;
+       result = sock->ops->connect(sock, &any, sizeof(any), 0);
+       if (result)
+               dprintk("RPC:      AF_UNSPEC connect return code %d\n",
+                               result);
+}
+
 /**
  * xs_tcp_connect_worker - connect a TCP socket to a remote endpoint
  * @args: RPC transport to connect
@@ -962,18 +1069,20 @@ static void xs_tcp_connect_worker(void *args)
 
        dprintk("RPC:      xs_tcp_connect_worker for xprt %p\n", xprt);
 
-       /* Start by resetting any existing socket state */
-       xs_close(xprt);
-
-       if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
-               dprintk("RPC:      can't create TCP transport socket (%d).\n", -err);
-               goto out;
-       }
+       if (!xprt->sock) {
+               /* start from scratch */
+               if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
+                       dprintk("RPC:      can't create TCP transport socket (%d).\n", -err);
+                       goto out;
+               }
 
-       if (xprt->resvport && xs_bindresvport(xprt, sock) < 0) {
-               sock_release(sock);
-               goto out;
-       }
+               if (xprt->resvport && xs_bindresvport(xprt, sock) < 0) {
+                       sock_release(sock);
+                       goto out;
+               }
+       } else
+               /* "close" the socket, preserving the local port */
+               xs_tcp_reuse_connection(xprt);
 
        if (!xprt->inet) {
                struct sock *sk = sock->sk;
@@ -987,7 +1096,13 @@ static void xs_tcp_connect_worker(void *args)
                sk->sk_data_ready = xs_tcp_data_ready;
                sk->sk_state_change = xs_tcp_state_change;
                sk->sk_write_space = xs_tcp_write_space;
-               tcp_sk(sk)->nonagle = 1;
+               sk->sk_allocation = GFP_ATOMIC;
+
+               /* socket options */
+               sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
+               sock_reset_flag(sk, SOCK_LINGER);
+               tcp_sk(sk)->linger2 = 0;
+               tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
 
                xprt_clear_connected(xprt);
 
@@ -1008,6 +1123,14 @@ static void xs_tcp_connect_worker(void *args)
                        case -EINPROGRESS:
                        case -EALREADY:
                                goto out_clear;
+                       case -ECONNREFUSED:
+                       case -ECONNRESET:
+                               /* retry with existing socket, after a delay */
+                               break;
+                       default:
+                               /* get rid of existing socket, and retry */
+                               xs_close(xprt);
+                               break;
                }
        }
 out:
@@ -1021,6 +1144,13 @@ out_clear:
  * @task: address of RPC task that manages state of connect request
  *
  * TCP: If the remote end dropped the connection, delay reconnecting.
+ *
+ * UDP socket connects are synchronous, but we use a work queue anyway
+ * to guarantee that even unprivileged user processes can set up a
+ * socket on a privileged port.
+ *
+ * If a UDP socket connect fails, the delay behavior here prevents
+ * retry floods (hard mounts).
  */
 static void xs_connect(struct rpc_task *task)
 {
@@ -1030,9 +1160,13 @@ static void xs_connect(struct rpc_task *task)
                return;
 
        if (xprt->sock != NULL) {
-               dprintk("RPC:      xs_connect delayed xprt %p\n", xprt);
+               dprintk("RPC:      xs_connect delayed xprt %p for %lu seconds\n",
+                               xprt, xprt->reestablish_timeout / HZ);
                schedule_delayed_work(&xprt->connect_worker,
-                                       RPC_REESTABLISH_TIMEOUT);
+                                       xprt->reestablish_timeout);
+               xprt->reestablish_timeout <<= 1;
+               if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO)
+                       xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
        } else {
                dprintk("RPC:      xs_connect scheduled xprt %p\n", xprt);
                schedule_work(&xprt->connect_worker);
@@ -1046,26 +1180,32 @@ static void xs_connect(struct rpc_task *task)
 static struct rpc_xprt_ops xs_udp_ops = {
        .set_buffer_size        = xs_udp_set_buffer_size,
        .reserve_xprt           = xprt_reserve_xprt_cong,
+       .release_xprt           = xprt_release_xprt_cong,
+       .set_port               = xs_set_port,
        .connect                = xs_connect,
+       .buf_alloc              = rpc_malloc,
+       .buf_free               = rpc_free,
        .send_request           = xs_udp_send_request,
        .set_retrans_timeout    = xprt_set_retrans_timeout_rtt,
+       .timer                  = xs_udp_timer,
+       .release_request        = xprt_release_rqst_cong,
        .close                  = xs_close,
        .destroy                = xs_destroy,
 };
 
 static struct rpc_xprt_ops xs_tcp_ops = {
-       .set_buffer_size        = xs_tcp_set_buffer_size,
        .reserve_xprt           = xprt_reserve_xprt,
+       .release_xprt           = xprt_release_xprt,
+       .set_port               = xs_set_port,
        .connect                = xs_connect,
+       .buf_alloc              = rpc_malloc,
+       .buf_free               = rpc_free,
        .send_request           = xs_tcp_send_request,
        .set_retrans_timeout    = xprt_set_retrans_timeout_def,
        .close                  = xs_close,
        .destroy                = xs_destroy,
 };
 
-extern unsigned int xprt_udp_slot_table_entries;
-extern unsigned int xprt_tcp_slot_table_entries;
-
 /**
  * xs_setup_udp - Set up transport to use a UDP socket
  * @xprt: transport to set up
@@ -1086,15 +1226,17 @@ int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to)
        memset(xprt->slot, 0, slot_table_size);
 
        xprt->prot = IPPROTO_UDP;
-       xprt->port = XS_MAX_RESVPORT;
+       xprt->port = xprt_max_resvport;
        xprt->tsh_size = 0;
-       xprt->nocong = 0;
-       xprt->cwnd = RPC_INITCWND;
        xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
        /* XXX: header size can vary due to auth type, IPv6, etc. */
        xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
 
        INIT_WORK(&xprt->connect_worker, xs_udp_connect_worker, xprt);
+       xprt->bind_timeout = XS_BIND_TO;
+       xprt->connect_timeout = XS_UDP_CONN_TO;
+       xprt->reestablish_timeout = XS_UDP_REEST_TO;
+       xprt->idle_timeout = XS_IDLE_DISC_TO;
 
        xprt->ops = &xs_udp_ops;
 
@@ -1126,14 +1268,16 @@ int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to)
        memset(xprt->slot, 0, slot_table_size);
 
        xprt->prot = IPPROTO_TCP;
-       xprt->port = XS_MAX_RESVPORT;
+       xprt->port = xprt_max_resvport;
        xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
-       xprt->nocong = 1;
-       xprt->cwnd = RPC_MAXCWND(xprt);
        xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
        xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
 
        INIT_WORK(&xprt->connect_worker, xs_tcp_connect_worker, xprt);
+       xprt->bind_timeout = XS_BIND_TO;
+       xprt->connect_timeout = XS_TCP_CONN_TO;
+       xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
+       xprt->idle_timeout = XS_IDLE_DISC_TO;
 
        xprt->ops = &xs_tcp_ops;