#include <linux/types.h>
#include <linux/slab.h>
+#include <linux/module.h>
#include <linux/capability.h>
#include <linux/pagemap.h>
#include <linux/errno.h>
#include <linux/tcp.h>
#include <linux/sunrpc/clnt.h>
#include <linux/sunrpc/sched.h>
+#include <linux/sunrpc/xprtsock.h>
#include <linux/file.h>
#include <net/sock.h>
return (struct sockaddr_in6 *) &xprt->addr;
}
-static void xs_format_ipv4_peer_addresses(struct rpc_xprt *xprt)
+static void xs_format_ipv4_peer_addresses(struct rpc_xprt *xprt,
+ const char *protocol,
+ const char *netid)
{
struct sockaddr_in *addr = xs_addr_in(xprt);
char *buf;
}
xprt->address_strings[RPC_DISPLAY_PORT] = buf;
- buf = kzalloc(8, GFP_KERNEL);
- if (buf) {
- if (xprt->prot == IPPROTO_UDP)
- snprintf(buf, 8, "udp");
- else
- snprintf(buf, 8, "tcp");
- }
- xprt->address_strings[RPC_DISPLAY_PROTO] = buf;
+ xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
buf = kzalloc(48, GFP_KERNEL);
if (buf) {
snprintf(buf, 48, "addr="NIPQUAD_FMT" port=%u proto=%s",
NIPQUAD(addr->sin_addr.s_addr),
ntohs(addr->sin_port),
- xprt->prot == IPPROTO_UDP ? "udp" : "tcp");
+ protocol);
}
xprt->address_strings[RPC_DISPLAY_ALL] = buf;
ntohs(addr->sin_port));
}
xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
+
+ buf = kzalloc(30, GFP_KERNEL);
+ if (buf) {
+ snprintf(buf, 30, NIPQUAD_FMT".%u.%u",
+ NIPQUAD(addr->sin_addr.s_addr),
+ ntohs(addr->sin_port) >> 8,
+ ntohs(addr->sin_port) & 0xff);
+ }
+ xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
+
+ xprt->address_strings[RPC_DISPLAY_NETID] = netid;
}
-static void xs_format_ipv6_peer_addresses(struct rpc_xprt *xprt)
+static void xs_format_ipv6_peer_addresses(struct rpc_xprt *xprt,
+ const char *protocol,
+ const char *netid)
{
struct sockaddr_in6 *addr = xs_addr_in6(xprt);
char *buf;
}
xprt->address_strings[RPC_DISPLAY_PORT] = buf;
- buf = kzalloc(8, GFP_KERNEL);
- if (buf) {
- if (xprt->prot == IPPROTO_UDP)
- snprintf(buf, 8, "udp");
- else
- snprintf(buf, 8, "tcp");
- }
- xprt->address_strings[RPC_DISPLAY_PROTO] = buf;
+ xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
buf = kzalloc(64, GFP_KERNEL);
if (buf) {
snprintf(buf, 64, "addr="NIP6_FMT" port=%u proto=%s",
NIP6(addr->sin6_addr),
ntohs(addr->sin6_port),
- xprt->prot == IPPROTO_UDP ? "udp" : "tcp");
+ protocol);
}
xprt->address_strings[RPC_DISPLAY_ALL] = buf;
ntohs(addr->sin6_port));
}
xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
+
+ buf = kzalloc(50, GFP_KERNEL);
+ if (buf) {
+ snprintf(buf, 50, NIP6_FMT".%u.%u",
+ NIP6(addr->sin6_addr),
+ ntohs(addr->sin6_port) >> 8,
+ ntohs(addr->sin6_port) & 0xff);
+ }
+ xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
+
+ xprt->address_strings[RPC_DISPLAY_NETID] = netid;
}
static void xs_free_peer_addresses(struct rpc_xprt *xprt)
{
- int i;
+ unsigned int i;
for (i = 0; i < RPC_DISPLAY_MAX; i++)
- kfree(xprt->address_strings[i]);
+ switch (i) {
+ case RPC_DISPLAY_PROTO:
+ case RPC_DISPLAY_NETID:
+ continue;
+ default:
+ kfree(xprt->address_strings[i]);
+ }
}
#define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL)
dprintk("RPC: xs_udp_send_request(%u) = %d\n",
xdr->len - req->rq_bytes_sent, status);
- if (likely(status >= (int) req->rq_slen))
- return 0;
-
- /* Still some bytes left; set up for a retry later. */
- if (status > 0)
+ if (status >= 0) {
+ task->tk_bytes_sent += status;
+ if (status >= req->rq_slen)
+ return 0;
+ /* Still some bytes left; set up for a retry later. */
status = -EAGAIN;
+ }
switch (status) {
case -ENETUNREACH:
return status;
}
+/**
+ * xs_tcp_shutdown - gracefully shut down a TCP socket
+ * @xprt: transport
+ *
+ * Initiates a graceful shutdown of the TCP socket by calling the
+ * equivalent of shutdown(SHUT_WR);
+ */
+static void xs_tcp_shutdown(struct rpc_xprt *xprt)
+{
+ struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
+ struct socket *sock = transport->sock;
+
+ if (sock != NULL)
+ kernel_sock_shutdown(sock, SHUT_WR);
+}
+
static inline void xs_encode_tcp_record_marker(struct xdr_buf *buf)
{
u32 reclen = buf->len - sizeof(rpc_fraghdr);
default:
dprintk("RPC: sendmsg returned unrecognized error %d\n",
-status);
- xprt_disconnect(xprt);
+ xs_tcp_shutdown(xprt);
break;
}
clear_close_wait:
smp_mb__before_clear_bit();
clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
+ clear_bit(XPRT_CLOSING, &xprt->state);
smp_mb__after_clear_bit();
+ xprt_disconnect_done(xprt);
}
/**
cancel_rearming_delayed_work(&transport->connect_worker);
- xprt_disconnect(xprt);
xs_close(xprt);
xs_free_peer_addresses(xprt);
kfree(xprt->slot);
kfree(xprt);
+ module_put(THIS_MODULE);
}
static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
copied = repsize;
/* Suck it into the iovec, verify checksum if not done by hw. */
- if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb))
+ if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
+ UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS);
goto out_unlock;
+ }
+
+ UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS);
/* Something worked... */
dst_confirm(skb->dst);
/* Sanity check of the record length */
if (unlikely(transport->tcp_reclen < 4)) {
dprintk("RPC: invalid TCP record fragment length\n");
- xprt_disconnect(xprt);
+ xprt_force_disconnect(xprt);
return;
}
dprintk("RPC: reading TCP record fragment of length %d\n",
transport->tcp_flags =
TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
- xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
xprt_wake_pending_tasks(xprt, 0);
}
spin_unlock_bh(&xprt->transport_lock);
break;
- case TCP_SYN_SENT:
- case TCP_SYN_RECV:
+ case TCP_FIN_WAIT1:
+ /* The client initiated a shutdown of the socket */
+ xprt->reestablish_timeout = 0;
+ set_bit(XPRT_CLOSING, &xprt->state);
+ smp_mb__before_clear_bit();
+ clear_bit(XPRT_CONNECTED, &xprt->state);
+ clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
+ smp_mb__after_clear_bit();
break;
case TCP_CLOSE_WAIT:
- /* Try to schedule an autoclose RPC calls */
- set_bit(XPRT_CLOSE_WAIT, &xprt->state);
- if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
- queue_work(rpciod_workqueue, &xprt->task_cleanup);
- default:
- xprt_disconnect(xprt);
+ /* The server initiated a shutdown of the socket */
+ set_bit(XPRT_CLOSING, &xprt->state);
+ xprt_force_disconnect(xprt);
+ case TCP_SYN_SENT:
+ case TCP_CLOSING:
+ /*
+ * If the server closed down the connection, make sure that
+ * we back off before reconnecting
+ */
+ if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
+ xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
+ break;
+ case TCP_LAST_ACK:
+ smp_mb__before_clear_bit();
+ clear_bit(XPRT_CONNECTED, &xprt->state);
+ smp_mb__after_clear_bit();
+ break;
+ case TCP_CLOSE:
+ smp_mb__before_clear_bit();
+ clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
+ clear_bit(XPRT_CLOSING, &xprt->state);
+ smp_mb__after_clear_bit();
+ /* Mark transport as closed and wake up all pending tasks */
+ xprt_disconnect_done(xprt);
}
out:
read_unlock(&sk->sk_callback_lock);
}
}
+static unsigned short xs_get_srcport(struct sock_xprt *transport, struct socket *sock)
+{
+ unsigned short port = transport->port;
+
+ if (port == 0 && transport->xprt.resvport)
+ port = xs_get_random_port();
+ return port;
+}
+
+static unsigned short xs_next_srcport(struct sock_xprt *transport, struct socket *sock, unsigned short port)
+{
+ if (transport->port != 0)
+ transport->port = 0;
+ if (!transport->xprt.resvport)
+ return 0;
+ if (port <= xprt_min_resvport || port > xprt_max_resvport)
+ return xprt_max_resvport;
+ return --port;
+}
+
static int xs_bind4(struct sock_xprt *transport, struct socket *sock)
{
struct sockaddr_in myaddr = {
.sin_family = AF_INET,
};
struct sockaddr_in *sa;
- int err;
- unsigned short port = transport->port;
+ int err, nloop = 0;
+ unsigned short port = xs_get_srcport(transport, sock);
+ unsigned short last;
- if (!transport->xprt.resvport)
- port = 0;
sa = (struct sockaddr_in *)&transport->addr;
myaddr.sin_addr = sa->sin_addr;
do {
myaddr.sin_port = htons(port);
err = kernel_bind(sock, (struct sockaddr *) &myaddr,
sizeof(myaddr));
- if (!transport->xprt.resvport)
+ if (port == 0)
break;
if (err == 0) {
transport->port = port;
break;
}
- if (port <= xprt_min_resvport)
- port = xprt_max_resvport;
- else
- port--;
- } while (err == -EADDRINUSE && port != transport->port);
+ last = port;
+ port = xs_next_srcport(transport, sock, port);
+ if (port > last)
+ nloop++;
+ } while (err == -EADDRINUSE && nloop != 2);
dprintk("RPC: %s "NIPQUAD_FMT":%u: %s (%d)\n",
__FUNCTION__, NIPQUAD(myaddr.sin_addr),
port, err ? "failed" : "ok", err);
.sin6_family = AF_INET6,
};
struct sockaddr_in6 *sa;
- int err;
- unsigned short port = transport->port;
+ int err, nloop = 0;
+ unsigned short port = xs_get_srcport(transport, sock);
+ unsigned short last;
- if (!transport->xprt.resvport)
- port = 0;
sa = (struct sockaddr_in6 *)&transport->addr;
myaddr.sin6_addr = sa->sin6_addr;
do {
myaddr.sin6_port = htons(port);
err = kernel_bind(sock, (struct sockaddr *) &myaddr,
sizeof(myaddr));
- if (!transport->xprt.resvport)
+ if (port == 0)
break;
if (err == 0) {
transport->port = port;
break;
}
- if (port <= xprt_min_resvport)
- port = xprt_max_resvport;
- else
- port--;
- } while (err == -EADDRINUSE && port != transport->port);
+ last = port;
+ port = xs_next_srcport(transport, sock, port);
+ if (port > last)
+ nloop++;
+ } while (err == -EADDRINUSE && nloop != 2);
dprintk("RPC: xs_bind6 "NIP6_FMT":%u: %s (%d)\n",
NIP6(myaddr.sin6_addr), port, err ? "failed" : "ok", err);
return err;
static struct lock_class_key xs_key[2];
static struct lock_class_key xs_slock_key[2];
-static inline void xs_reclassify_socket(struct socket *sock)
+static inline void xs_reclassify_socket4(struct socket *sock)
{
struct sock *sk = sock->sk;
- BUG_ON(sk->sk_lock.owner != NULL);
- switch (sk->sk_family) {
- case AF_INET:
- sock_lock_init_class_and_name(sk, "slock-AF_INET-NFS",
- &xs_slock_key[0], "sk_lock-AF_INET-NFS", &xs_key[0]);
- break;
- case AF_INET6:
- sock_lock_init_class_and_name(sk, "slock-AF_INET6-NFS",
- &xs_slock_key[1], "sk_lock-AF_INET6-NFS", &xs_key[1]);
- break;
+ BUG_ON(sock_owned_by_user(sk));
+ sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC",
+ &xs_slock_key[0], "sk_lock-AF_INET-RPC", &xs_key[0]);
+}
- default:
- BUG();
- }
+static inline void xs_reclassify_socket6(struct socket *sock)
+{
+ struct sock *sk = sock->sk;
+
+ BUG_ON(sock_owned_by_user(sk));
+ sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC",
+ &xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]);
}
#else
-static inline void xs_reclassify_socket(struct socket *sock)
+static inline void xs_reclassify_socket4(struct socket *sock)
+{
+}
+
+static inline void xs_reclassify_socket6(struct socket *sock)
{
}
#endif
dprintk("RPC: can't create UDP transport socket (%d).\n", -err);
goto out;
}
- xs_reclassify_socket(sock);
+ xs_reclassify_socket4(sock);
if (xs_bind4(transport, sock)) {
sock_release(sock);
dprintk("RPC: can't create UDP transport socket (%d).\n", -err);
goto out;
}
- xs_reclassify_socket(sock);
+ xs_reclassify_socket6(sock);
if (xs_bind6(transport, sock) < 0) {
sock_release(sock);
dprintk("RPC: can't create TCP transport socket (%d).\n", -err);
goto out;
}
- xs_reclassify_socket(sock);
+ xs_reclassify_socket4(sock);
if (xs_bind4(transport, sock) < 0) {
sock_release(sock);
break;
default:
/* get rid of existing socket, and retry */
- xs_close(xprt);
- break;
+ xs_tcp_shutdown(xprt);
}
}
out:
dprintk("RPC: can't create TCP transport socket (%d).\n", -err);
goto out;
}
- xs_reclassify_socket(sock);
+ xs_reclassify_socket6(sock);
if (xs_bind6(transport, sock) < 0) {
sock_release(sock);
break;
default:
/* get rid of existing socket, and retry */
- xs_close(xprt);
- break;
+ xs_tcp_shutdown(xprt);
}
}
out:
}
}
+static void xs_tcp_connect(struct rpc_task *task)
+{
+ struct rpc_xprt *xprt = task->tk_xprt;
+
+ /* Initiate graceful shutdown of the socket if not already done */
+ if (test_bit(XPRT_CONNECTED, &xprt->state))
+ xs_tcp_shutdown(xprt);
+ /* Exit if we need to wait for socket shutdown to complete */
+ if (test_bit(XPRT_CLOSING, &xprt->state))
+ return;
+ xs_connect(task);
+}
+
/**
* xs_udp_print_stats - display UDP socket-specifc stats
* @xprt: rpc_xprt struct containing statistics
.release_xprt = xs_tcp_release_xprt,
.rpcbind = rpcb_getport_async,
.set_port = xs_set_port,
- .connect = xs_connect,
+ .connect = xs_tcp_connect,
.buf_alloc = rpc_malloc,
.buf_free = rpc_free,
.send_request = xs_tcp_send_request,
.set_retrans_timeout = xprt_set_retrans_timeout_def,
- .close = xs_close,
+ .close = xs_tcp_shutdown,
.destroy = xs_destroy,
.print_stats = xs_tcp_print_stats,
};
-static struct rpc_xprt *xs_setup_xprt(struct rpc_xprtsock_create *args, unsigned int slot_table_size)
+static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
+ unsigned int slot_table_size)
{
struct rpc_xprt *xprt;
struct sock_xprt *new;
xprt->addrlen = args->addrlen;
if (args->srcaddr)
memcpy(&new->addr, args->srcaddr, args->addrlen);
- new->port = xs_get_random_port();
return xprt;
}
+static const struct rpc_timeout xs_udp_default_timeout = {
+ .to_initval = 5 * HZ,
+ .to_maxval = 30 * HZ,
+ .to_increment = 5 * HZ,
+ .to_retries = 5,
+};
+
/**
* xs_setup_udp - Set up transport to use a UDP socket
* @args: rpc transport creation arguments
*
*/
-struct rpc_xprt *xs_setup_udp(struct rpc_xprtsock_create *args)
+static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
{
struct sockaddr *addr = args->dstaddr;
struct rpc_xprt *xprt;
xprt->ops = &xs_udp_ops;
- if (args->timeout)
- xprt->timeout = *args->timeout;
- else
- xprt_set_timeout(&xprt->timeout, 5, 5 * HZ);
+ xprt->timeout = &xs_udp_default_timeout;
switch (addr->sa_family) {
case AF_INET:
INIT_DELAYED_WORK(&transport->connect_worker,
xs_udp_connect_worker4);
- xs_format_ipv4_peer_addresses(xprt);
+ xs_format_ipv4_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP);
break;
case AF_INET6:
if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
INIT_DELAYED_WORK(&transport->connect_worker,
xs_udp_connect_worker6);
- xs_format_ipv6_peer_addresses(xprt);
+ xs_format_ipv6_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6);
break;
default:
kfree(xprt);
dprintk("RPC: set up transport to address %s\n",
xprt->address_strings[RPC_DISPLAY_ALL]);
- return xprt;
+ if (try_module_get(THIS_MODULE))
+ return xprt;
+
+ kfree(xprt->slot);
+ kfree(xprt);
+ return ERR_PTR(-EINVAL);
}
+static const struct rpc_timeout xs_tcp_default_timeout = {
+ .to_initval = 60 * HZ,
+ .to_maxval = 60 * HZ,
+ .to_retries = 2,
+};
+
/**
* xs_setup_tcp - Set up transport to use a TCP socket
* @args: rpc transport creation arguments
*
*/
-struct rpc_xprt *xs_setup_tcp(struct rpc_xprtsock_create *args)
+static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
{
struct sockaddr *addr = args->dstaddr;
struct rpc_xprt *xprt;
xprt->idle_timeout = XS_IDLE_DISC_TO;
xprt->ops = &xs_tcp_ops;
-
- if (args->timeout)
- xprt->timeout = *args->timeout;
- else
- xprt_set_timeout(&xprt->timeout, 2, 60 * HZ);
+ xprt->timeout = &xs_tcp_default_timeout;
switch (addr->sa_family) {
case AF_INET:
xprt_set_bound(xprt);
INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker4);
- xs_format_ipv4_peer_addresses(xprt);
+ xs_format_ipv4_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP);
break;
case AF_INET6:
if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
xprt_set_bound(xprt);
INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker6);
- xs_format_ipv6_peer_addresses(xprt);
+ xs_format_ipv6_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6);
break;
default:
kfree(xprt);
dprintk("RPC: set up transport to address %s\n",
xprt->address_strings[RPC_DISPLAY_ALL]);
- return xprt;
+ if (try_module_get(THIS_MODULE))
+ return xprt;
+
+ kfree(xprt->slot);
+ kfree(xprt);
+ return ERR_PTR(-EINVAL);
}
+static struct xprt_class xs_udp_transport = {
+ .list = LIST_HEAD_INIT(xs_udp_transport.list),
+ .name = "udp",
+ .owner = THIS_MODULE,
+ .ident = IPPROTO_UDP,
+ .setup = xs_setup_udp,
+};
+
+static struct xprt_class xs_tcp_transport = {
+ .list = LIST_HEAD_INIT(xs_tcp_transport.list),
+ .name = "tcp",
+ .owner = THIS_MODULE,
+ .ident = IPPROTO_TCP,
+ .setup = xs_setup_tcp,
+};
+
/**
- * init_socket_xprt - set up xprtsock's sysctls
+ * init_socket_xprt - set up xprtsock's sysctls, register with RPC client
*
*/
int init_socket_xprt(void)
sunrpc_table_header = register_sysctl_table(sunrpc_table);
#endif
+ xprt_register_transport(&xs_udp_transport);
+ xprt_register_transport(&xs_tcp_transport);
+
return 0;
}
/**
- * cleanup_socket_xprt - remove xprtsock's sysctls
+ * cleanup_socket_xprt - remove xprtsock's sysctls, unregister
*
*/
void cleanup_socket_xprt(void)
sunrpc_table_header = NULL;
}
#endif
+
+ xprt_unregister_transport(&xs_udp_transport);
+ xprt_unregister_transport(&xs_tcp_transport);
}