spin_lock_bh(&xprt->sc_ctxt_lock);
if (ctxt) {
at_least_one = 1;
- ctxt->next = xprt->sc_ctxt_head;
- xprt->sc_ctxt_head = ctxt;
+ INIT_LIST_HEAD(&ctxt->free_list);
+ list_add(&ctxt->free_list, &xprt->sc_ctxt_free);
} else {
/* kmalloc failed...give up for now */
xprt->sc_ctxt_cnt--;
while (1) {
spin_lock_bh(&xprt->sc_ctxt_lock);
- if (unlikely(xprt->sc_ctxt_head == NULL)) {
+ if (unlikely(list_empty(&xprt->sc_ctxt_free))) {
/* Try to bump my cache. */
spin_unlock_bh(&xprt->sc_ctxt_lock);
schedule_timeout_uninterruptible(msecs_to_jiffies(500));
continue;
}
- ctxt = xprt->sc_ctxt_head;
- xprt->sc_ctxt_head = ctxt->next;
+ ctxt = list_entry(xprt->sc_ctxt_free.next,
+ struct svc_rdma_op_ctxt,
+ free_list);
+ list_del_init(&ctxt->free_list);
spin_unlock_bh(&xprt->sc_ctxt_lock);
ctxt->xprt = xprt;
INIT_LIST_HEAD(&ctxt->dto_q);
ctxt->count = 0;
+ atomic_inc(&xprt->sc_ctxt_used);
break;
}
return ctxt;
put_page(ctxt->pages[i]);
for (i = 0; i < ctxt->count; i++)
- dma_unmap_single(xprt->sc_cm_id->device->dma_device,
- ctxt->sge[i].addr,
- ctxt->sge[i].length,
- ctxt->direction);
+ ib_dma_unmap_single(xprt->sc_cm_id->device,
+ ctxt->sge[i].addr,
+ ctxt->sge[i].length,
+ ctxt->direction);
+
spin_lock_bh(&xprt->sc_ctxt_lock);
- ctxt->next = xprt->sc_ctxt_head;
- xprt->sc_ctxt_head = ctxt;
+ list_add(&ctxt->free_list, &xprt->sc_ctxt_free);
spin_unlock_bh(&xprt->sc_ctxt_lock);
+ atomic_dec(&xprt->sc_ctxt_used);
}
/* ib_cq event handler */
struct svcxprt_rdma *xprt = cq_context;
unsigned long flags;
+ /* Guard against unconditional flush call for destroyed QP */
+ if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
+ return;
+
/*
* Set the bit regardless of whether or not it's on the list
* because it may be on the list already due to an SQ
* completion.
- */
+ */
set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags);
/*
*
* Take all completing WC off the CQE and enqueue the associated DTO
* context on the dto_q for the transport.
+ *
+ * Note that caller must hold a transport reference.
*/
static void rq_cq_reap(struct svcxprt_rdma *xprt)
{
ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
atomic_inc(&rdma_stat_rq_poll);
- spin_lock_bh(&xprt->sc_rq_dto_lock);
while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) {
ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
ctxt->wc_status = wc.status;
ctxt->byte_len = wc.byte_len;
if (wc.status != IB_WC_SUCCESS) {
/* Close the transport */
+ dprintk("svcrdma: transport closing putting ctxt %p\n", ctxt);
set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
svc_rdma_put_context(ctxt, 1);
+ svc_xprt_put(&xprt->sc_xprt);
continue;
}
+ spin_lock_bh(&xprt->sc_rq_dto_lock);
list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
+ spin_unlock_bh(&xprt->sc_rq_dto_lock);
+ svc_xprt_put(&xprt->sc_xprt);
}
- spin_unlock_bh(&xprt->sc_rq_dto_lock);
if (ctxt)
atomic_inc(&rdma_stat_rq_prod);
/*
* Send Queue Completion Handler - potentially called on interrupt context.
+ *
+ * Note that caller must hold a transport reference.
*/
static void sq_cq_reap(struct svcxprt_rdma *xprt)
{
case IB_WR_RDMA_READ:
if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
+ struct svc_rdma_op_ctxt *read_hdr = ctxt->read_hdr;
+ BUG_ON(!read_hdr);
set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
- set_bit(RDMACTXT_F_READ_DONE, &ctxt->flags);
spin_lock_bh(&xprt->sc_read_complete_lock);
- list_add_tail(&ctxt->dto_q,
+ list_add_tail(&read_hdr->dto_q,
&xprt->sc_read_complete_q);
spin_unlock_bh(&xprt->sc_read_complete_lock);
svc_xprt_enqueue(&xprt->sc_xprt);
}
+ svc_rdma_put_context(ctxt, 0);
break;
default:
wc.opcode, wc.status);
break;
}
+ svc_xprt_put(&xprt->sc_xprt);
}
if (ctxt)
struct svcxprt_rdma *xprt = cq_context;
unsigned long flags;
+ /* Guard against unconditional flush call for destroyed QP */
+ if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
+ return;
+
/*
* Set the bit regardless of whether or not it's on the list
* because it may be on the list already due to an RQ
* completion.
- */
+ */
set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags);
/*
xprt->sc_ctxt_max = ctxt_max;
xprt->sc_ctxt_bump = ctxt_bump;
xprt->sc_ctxt_cnt = 0;
- xprt->sc_ctxt_head = NULL;
+ atomic_set(&xprt->sc_ctxt_used, 0);
+
+ INIT_LIST_HEAD(&xprt->sc_ctxt_free);
for (i = 0; i < ctxt_count; i++) {
ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
if (ctxt) {
- ctxt->next = xprt->sc_ctxt_head;
- xprt->sc_ctxt_head = ctxt;
+ INIT_LIST_HEAD(&ctxt->free_list);
+ list_add(&ctxt->free_list, &xprt->sc_ctxt_free);
xprt->sc_ctxt_cnt++;
}
}
}
-static void destroy_context_cache(struct svc_rdma_op_ctxt *ctxt)
+static void destroy_context_cache(struct svcxprt_rdma *xprt)
{
- struct svc_rdma_op_ctxt *next;
- if (!ctxt)
- return;
-
- do {
- next = ctxt->next;
+ while (!list_empty(&xprt->sc_ctxt_free)) {
+ struct svc_rdma_op_ctxt *ctxt;
+ ctxt = list_entry(xprt->sc_ctxt_free.next,
+ struct svc_rdma_op_ctxt,
+ free_list);
+ list_del_init(&ctxt->free_list);
kfree(ctxt);
- ctxt = next;
- } while (next);
+ }
}
static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
reqs +
cma_xprt->sc_sq_depth +
RPCRDMA_MAX_THREADS + 1); /* max */
- if (!cma_xprt->sc_ctxt_head) {
+ if (list_empty(&cma_xprt->sc_ctxt_free)) {
kfree(cma_xprt);
return NULL;
}
recv_wr.num_sge = ctxt->count;
recv_wr.wr_id = (u64)(unsigned long)ctxt;
+ svc_xprt_get(&xprt->sc_xprt);
ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
- if (ret)
+ if (ret) {
+ svc_xprt_put(&xprt->sc_xprt);
svc_rdma_put_context(ctxt, 1);
+ }
return ret;
}
{
struct svcxprt_rdma *listen_xprt = new_cma_id->context;
struct svcxprt_rdma *newxprt;
+ struct sockaddr *sa;
/* Create a new transport */
newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0);
dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n",
newxprt, newxprt->sc_cm_id, listen_xprt);
+ /* Set the local and remote addresses in the transport */
+ sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
+ svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa));
+ sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
+ svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa));
+
/*
* Enqueue the new transport on the accept queue of the listening
* transport
cma_xprt = rdma_create_xprt(serv, 1);
if (!cma_xprt)
- return ERR_PTR(ENOMEM);
+ return ERR_PTR(-ENOMEM);
xprt = &cma_xprt->sc_xprt;
listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP);
if (IS_ERR(listen_id)) {
- svc_xprt_put(&cma_xprt->sc_xprt);
- dprintk("svcrdma: rdma_create_id failed = %ld\n",
- PTR_ERR(listen_id));
- return (void *)listen_id;
+ ret = PTR_ERR(listen_id);
+ dprintk("svcrdma: rdma_create_id failed = %d\n", ret);
+ goto err0;
}
+
ret = rdma_bind_addr(listen_id, sa);
if (ret) {
- rdma_destroy_id(listen_id);
- svc_xprt_put(&cma_xprt->sc_xprt);
dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret);
- return ERR_PTR(ret);
+ goto err1;
}
cma_xprt->sc_cm_id = listen_id;
ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG);
if (ret) {
- rdma_destroy_id(listen_id);
- svc_xprt_put(&cma_xprt->sc_xprt);
dprintk("svcrdma: rdma_listen failed = %d\n", ret);
- return ERR_PTR(ret);
+ goto err1;
}
/*
svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen);
return &cma_xprt->sc_xprt;
+
+ err1:
+ rdma_destroy_id(listen_id);
+ err0:
+ kfree(cma_xprt);
+ return ERR_PTR(ret);
}
/*
struct rdma_conn_param conn_param;
struct ib_qp_init_attr qp_attr;
struct ib_device_attr devattr;
- struct sockaddr *sa;
int ret;
int i;
newxprt->sc_sq_depth = qp_attr.cap.max_send_wr;
newxprt->sc_max_requests = qp_attr.cap.max_recv_wr;
}
- svc_xprt_get(&newxprt->sc_xprt);
newxprt->sc_qp = newxprt->sc_cm_id->qp;
/* Register all of physical memory */
/* Swap out the handler */
newxprt->sc_cm_id->event_handler = rdma_cma_handler;
+ /*
+ * Arm the CQs for the SQ and RQ before accepting so we can't
+ * miss the first message
+ */
+ ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
+ ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
+
/* Accept Connection */
set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
memset(&conn_param, 0, sizeof conn_param);
newxprt->sc_max_requests,
newxprt->sc_ord);
- /* Set the local and remote addresses in the transport */
- sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
- svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa));
- sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
- svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa));
-
- ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
- ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
return &newxprt->sc_xprt;
errout:
dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret);
/* Take a reference in case the DTO handler runs */
svc_xprt_get(&newxprt->sc_xprt);
- if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp)) {
+ if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp))
ib_destroy_qp(newxprt->sc_qp);
- svc_xprt_put(&newxprt->sc_xprt);
- }
rdma_destroy_id(newxprt->sc_cm_id);
/* This call to put will destroy the transport */
svc_xprt_put(&newxprt->sc_xprt);
}
/*
- * When connected, an svc_xprt has at least three references:
- *
- * - A reference held by the QP. We still hold that here because this
- * code deletes the QP and puts the reference.
+ * When connected, an svc_xprt has at least two references:
*
* - A reference held by the cm_id between the ESTABLISHED and
* DISCONNECTED events. If the remote peer disconnected first, this
* - A reference held by the svc_recv code that called this function
* as part of close processing.
*
- * At a minimum two references should still be held.
+ * At a minimum one references should still be held.
*/
static void svc_rdma_detach(struct svc_xprt *xprt)
{
/* Disconnect and flush posted WQE */
rdma_disconnect(rdma->sc_cm_id);
-
- /* Destroy the QP if present (not a listener) */
- if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) {
- ib_destroy_qp(rdma->sc_qp);
- svc_xprt_put(xprt);
- }
-
- /* Destroy the CM ID */
- rdma_destroy_id(rdma->sc_cm_id);
}
-static void svc_rdma_free(struct svc_xprt *xprt)
+static void __svc_rdma_free(struct work_struct *work)
{
- struct svcxprt_rdma *rdma = (struct svcxprt_rdma *)xprt;
+ struct svcxprt_rdma *rdma =
+ container_of(work, struct svcxprt_rdma, sc_work);
dprintk("svcrdma: svc_rdma_free(%p)\n", rdma);
+
/* We should only be called from kref_put */
- BUG_ON(atomic_read(&xprt->xpt_ref.refcount) != 0);
+ BUG_ON(atomic_read(&rdma->sc_xprt.xpt_ref.refcount) != 0);
+
+ /*
+ * Destroy queued, but not processed read completions. Note
+ * that this cleanup has to be done before destroying the
+ * cm_id because the device ptr is needed to unmap the dma in
+ * svc_rdma_put_context.
+ */
+ spin_lock_bh(&rdma->sc_read_complete_lock);
+ while (!list_empty(&rdma->sc_read_complete_q)) {
+ struct svc_rdma_op_ctxt *ctxt;
+ ctxt = list_entry(rdma->sc_read_complete_q.next,
+ struct svc_rdma_op_ctxt,
+ dto_q);
+ list_del_init(&ctxt->dto_q);
+ svc_rdma_put_context(ctxt, 1);
+ }
+ spin_unlock_bh(&rdma->sc_read_complete_lock);
+
+ /* Destroy queued, but not processed recv completions */
+ spin_lock_bh(&rdma->sc_rq_dto_lock);
+ while (!list_empty(&rdma->sc_rq_dto_q)) {
+ struct svc_rdma_op_ctxt *ctxt;
+ ctxt = list_entry(rdma->sc_rq_dto_q.next,
+ struct svc_rdma_op_ctxt,
+ dto_q);
+ list_del_init(&ctxt->dto_q);
+ svc_rdma_put_context(ctxt, 1);
+ }
+ spin_unlock_bh(&rdma->sc_rq_dto_lock);
+
+ /* Warn if we leaked a resource or under-referenced */
+ WARN_ON(atomic_read(&rdma->sc_ctxt_used) != 0);
+
+ /* Destroy the QP if present (not a listener) */
+ if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))
+ ib_destroy_qp(rdma->sc_qp);
+
if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq))
ib_destroy_cq(rdma->sc_sq_cq);
if (rdma->sc_pd && !IS_ERR(rdma->sc_pd))
ib_dealloc_pd(rdma->sc_pd);
- destroy_context_cache(rdma->sc_ctxt_head);
+ /* Destroy the CM ID */
+ rdma_destroy_id(rdma->sc_cm_id);
+
+ destroy_context_cache(rdma);
kfree(rdma);
}
+static void svc_rdma_free(struct svc_xprt *xprt)
+{
+ struct svcxprt_rdma *rdma =
+ container_of(xprt, struct svcxprt_rdma, sc_xprt);
+ INIT_WORK(&rdma->sc_work, __svc_rdma_free);
+ schedule_work(&rdma->sc_work);
+}
+
static int svc_rdma_has_wspace(struct svc_xprt *xprt)
{
struct svcxprt_rdma *rdma =
continue;
}
/* Bumped used SQ WR count and post */
+ svc_xprt_get(&xprt->sc_xprt);
ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
if (!ret)
atomic_inc(&xprt->sc_sq_count);
- else
+ else {
+ svc_xprt_put(&xprt->sc_xprt);
dprintk("svcrdma: failed to post SQ WR rc=%d, "
"sc_sq_count=%d, sc_sq_depth=%d\n",
ret, atomic_read(&xprt->sc_sq_count),
xprt->sc_sq_depth);
+ }
spin_unlock_bh(&xprt->sc_lock);
break;
}
return ret;
}
-int svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
- enum rpcrdma_errcode err)
+void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
+ enum rpcrdma_errcode err)
{
struct ib_send_wr err_wr;
struct ib_sge sge;
/* Post It */
ret = svc_rdma_send(xprt, &err_wr);
if (ret) {
- dprintk("svcrdma: Error posting send = %d\n", ret);
+ dprintk("svcrdma: Error %d posting send for protocol error\n",
+ ret);
svc_rdma_put_context(ctxt, 1);
}
-
- return ret;
}