]> err.no Git - linux-2.6/blobdiff - drivers/infiniband/ulp/ipoib/ipoib_cm.c
IPoIB/cm: Fix SRQ WR leak
[linux-2.6] / drivers / infiniband / ulp / ipoib / ipoib_cm.c
index e70492db74f636eadd46a52bbc951a33e85e09bc..ffec794b7913d6470bc2f3409a9fadd93c446cb2 100644 (file)
@@ -37,6 +37,7 @@
 #include <net/dst.h>
 #include <net/icmp.h>
 #include <linux/icmpv6.h>
+#include <linux/delay.h>
 
 #ifdef CONFIG_INFINIBAND_IPOIB_DEBUG_DATA
 static int data_debug_level;
@@ -62,6 +63,16 @@ struct ipoib_cm_id {
        u32 remote_mtu;
 };
 
+static struct ib_qp_attr ipoib_cm_err_attr = {
+       .qp_state = IB_QPS_ERR
+};
+
+#define IPOIB_CM_RX_DRAIN_WRID 0x7fffffff
+
+static struct ib_recv_wr ipoib_cm_rx_drain_wr = {
+       .wr_id = IPOIB_CM_RX_DRAIN_WRID
+};
+
 static int ipoib_cm_tx_handler(struct ib_cm_id *cm_id,
                               struct ib_cm_event *event);
 
@@ -131,7 +142,7 @@ static struct sk_buff *ipoib_cm_alloc_rx_skb(struct net_device *dev, int id, int
                skb_fill_page_desc(skb, i, page, 0, PAGE_SIZE);
 
                mapping[i + 1] = ib_dma_map_page(priv->ca, skb_shinfo(skb)->frags[i].page,
-                                                0, PAGE_SIZE, DMA_TO_DEVICE);
+                                                0, PAGE_SIZE, DMA_FROM_DEVICE);
                if (unlikely(ib_dma_mapping_error(priv->ca, mapping[i + 1])))
                        goto partial_error;
        }
@@ -150,11 +161,44 @@ partial_error:
        return NULL;
 }
 
+static void ipoib_cm_start_rx_drain(struct ipoib_dev_priv* priv)
+{
+       struct ib_recv_wr *bad_wr;
+
+       /* rx_drain_qp send queue depth is 1, so
+        * make sure we have at most 1 outstanding WR. */
+       if (list_empty(&priv->cm.rx_flush_list) ||
+           !list_empty(&priv->cm.rx_drain_list))
+               return;
+
+       if (ib_post_recv(priv->cm.rx_drain_qp, &ipoib_cm_rx_drain_wr, &bad_wr))
+               ipoib_warn(priv, "failed to post rx_drain wr\n");
+
+       list_splice_init(&priv->cm.rx_flush_list, &priv->cm.rx_drain_list);
+}
+
+static void ipoib_cm_rx_event_handler(struct ib_event *event, void *ctx)
+{
+       struct ipoib_cm_rx *p = ctx;
+       struct ipoib_dev_priv *priv = netdev_priv(p->dev);
+       unsigned long flags;
+
+       if (event->event != IB_EVENT_QP_LAST_WQE_REACHED)
+               return;
+
+       spin_lock_irqsave(&priv->lock, flags);
+       list_move(&p->list, &priv->cm.rx_flush_list);
+       p->state = IPOIB_CM_RX_FLUSH;
+       ipoib_cm_start_rx_drain(priv);
+       spin_unlock_irqrestore(&priv->lock, flags);
+}
+
 static struct ib_qp *ipoib_cm_create_rx_qp(struct net_device *dev,
                                           struct ipoib_cm_rx *p)
 {
        struct ipoib_dev_priv *priv = netdev_priv(dev);
        struct ib_qp_init_attr attr = {
+               .event_handler = ipoib_cm_rx_event_handler,
                .send_cq = priv->cq, /* does not matter, we never send anything */
                .recv_cq = priv->cq,
                .srq = priv->cm.srq,
@@ -228,7 +272,6 @@ static int ipoib_cm_req_handler(struct ib_cm_id *cm_id, struct ib_cm_event *even
        struct net_device *dev = cm_id->context;
        struct ipoib_dev_priv *priv = netdev_priv(dev);
        struct ipoib_cm_rx *p;
-       unsigned long flags;
        unsigned psn;
        int ret;
 
@@ -257,11 +300,13 @@ static int ipoib_cm_req_handler(struct ib_cm_id *cm_id, struct ib_cm_event *even
 
        cm_id->context = p;
        p->jiffies = jiffies;
-       spin_lock_irqsave(&priv->lock, flags);
+       p->state = IPOIB_CM_RX_LIVE;
+       spin_lock_irq(&priv->lock);
+       if (list_empty(&priv->cm.passive_ids))
+               queue_delayed_work(ipoib_workqueue,
+                                  &priv->cm.stale_task, IPOIB_CM_RX_DELAY);
        list_add(&p->list, &priv->cm.passive_ids);
-       spin_unlock_irqrestore(&priv->lock, flags);
-       queue_delayed_work(ipoib_workqueue,
-                          &priv->cm.stale_task, IPOIB_CM_RX_DELAY);
+       spin_unlock_irq(&priv->lock);
        return 0;
 
 err_rep:
@@ -277,8 +322,6 @@ static int ipoib_cm_rx_handler(struct ib_cm_id *cm_id,
 {
        struct ipoib_cm_rx *p;
        struct ipoib_dev_priv *priv;
-       unsigned long flags;
-       int ret;
 
        switch (event->event) {
        case IB_CM_REQ_RECEIVED:
@@ -290,20 +333,9 @@ static int ipoib_cm_rx_handler(struct ib_cm_id *cm_id,
        case IB_CM_REJ_RECEIVED:
                p = cm_id->context;
                priv = netdev_priv(p->dev);
-               spin_lock_irqsave(&priv->lock, flags);
-               if (list_empty(&p->list))
-                       ret = 0; /* Connection is going away already. */
-               else {
-                       list_del_init(&p->list);
-                       ret = -ECONNRESET;
-               }
-               spin_unlock_irqrestore(&priv->lock, flags);
-               if (ret) {
-                       ib_destroy_qp(p->qp);
-                       kfree(p);
-                       return ret;
-               }
-               return 0;
+               if (ib_modify_qp(p->qp, &ipoib_cm_err_attr, IB_QP_STATE))
+                       ipoib_warn(priv, "unable to move qp to error state\n");
+               /* Fall through */
        default:
                return 0;
        }
@@ -351,12 +383,19 @@ void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
        u64 mapping[IPOIB_CM_RX_SG];
        int frags;
 
-       ipoib_dbg_data(priv, "cm recv completion: id %d, op %d, status: %d\n",
-                      wr_id, wc->opcode, wc->status);
+       ipoib_dbg_data(priv, "cm recv completion: id %d, status: %d\n",
+                      wr_id, wc->status);
 
        if (unlikely(wr_id >= ipoib_recvq_size)) {
-               ipoib_warn(priv, "cm recv completion event with wrid %d (> %d)\n",
-                          wr_id, ipoib_recvq_size);
+               if (wr_id == (IPOIB_CM_RX_DRAIN_WRID & ~IPOIB_CM_OP_SRQ)) {
+                       spin_lock_irqsave(&priv->lock, flags);
+                       list_splice_init(&priv->cm.rx_drain_list, &priv->cm.rx_reap_list);
+                       ipoib_cm_start_rx_drain(priv);
+                       queue_work(ipoib_workqueue, &priv->cm.rx_reap_task);
+                       spin_unlock_irqrestore(&priv->lock, flags);
+               } else
+                       ipoib_warn(priv, "cm recv completion event with wrid %d (> %d)\n",
+                                  wr_id, ipoib_recvq_size);
                return;
        }
 
@@ -372,16 +411,14 @@ void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
 
        if (!likely(wr_id & IPOIB_CM_RX_UPDATE_MASK)) {
                p = wc->qp->qp_context;
-               if (time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
+               if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
                        spin_lock_irqsave(&priv->lock, flags);
                        p->jiffies = jiffies;
-                       /* Move this entry to list head, but do
-                        * not re-add it if it has been removed. */
-                       if (!list_empty(&p->list))
+                       /* Move this entry to list head, but do not re-add it
+                        * if it has been moved out of list. */
+                       if (p->state == IPOIB_CM_RX_LIVE)
                                list_move(&p->list, &priv->cm.passive_ids);
                        spin_unlock_irqrestore(&priv->lock, flags);
-                       queue_delayed_work(ipoib_workqueue,
-                                          &priv->cm.stale_task, IPOIB_CM_RX_DELAY);
                }
        }
 
@@ -408,7 +445,7 @@ void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
        skb_put_frags(skb, IPOIB_CM_HEAD_SIZE, wc->byte_len, newskb);
 
        skb->protocol = ((struct ipoib_header *) skb->data)->proto;
-       skb->mac.raw = skb->data;
+       skb_reset_mac_header(skb);
        skb_pull(skb, IPOIB_ENCAP_LEN);
 
        dev->last_rx = jiffies;
@@ -418,7 +455,7 @@ void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
        skb->dev = dev;
        /* XXX get correct PACKET_ type here */
        skb->pkt_type = PACKET_HOST;
-       netif_rx_ni(skb);
+       netif_receive_skb(skb);
 
 repost:
        if (unlikely(ipoib_cm_post_receive(dev, wr_id)))
@@ -504,8 +541,8 @@ static void ipoib_cm_handle_tx_wc(struct net_device *dev, struct ipoib_cm_tx *tx
        struct ipoib_tx_buf *tx_req;
        unsigned long flags;
 
-       ipoib_dbg_data(priv, "cm send completion: id %d, op %d, status: %d\n",
-                      wr_id, wc->opcode, wc->status);
+       ipoib_dbg_data(priv, "cm send completion: id %d, status: %d\n",
+                      wr_id, wc->status);
 
        if (unlikely(wr_id >= ipoib_sendq_size)) {
                ipoib_warn(priv, "cm send completion event with wrid %d (> %d)\n",
@@ -586,15 +623,43 @@ static void ipoib_cm_tx_completion(struct ib_cq *cq, void *tx_ptr)
 int ipoib_cm_dev_open(struct net_device *dev)
 {
        struct ipoib_dev_priv *priv = netdev_priv(dev);
+       struct ib_qp_init_attr qp_init_attr = {
+               .send_cq = priv->cq,   /* does not matter, we never send anything */
+               .recv_cq = priv->cq,
+               .cap.max_send_wr = 1,  /* FIXME: 0 Seems not to work */
+               .cap.max_send_sge = 1, /* FIXME: 0 Seems not to work */
+               .cap.max_recv_wr = 1,
+               .cap.max_recv_sge = 1, /* FIXME: 0 Seems not to work */
+               .sq_sig_type = IB_SIGNAL_ALL_WR,
+               .qp_type = IB_QPT_UC,
+       };
        int ret;
 
        if (!IPOIB_CM_SUPPORTED(dev->dev_addr))
                return 0;
 
+       priv->cm.rx_drain_qp = ib_create_qp(priv->pd, &qp_init_attr);
+       if (IS_ERR(priv->cm.rx_drain_qp)) {
+               printk(KERN_WARNING "%s: failed to create CM ID\n", priv->ca->name);
+               ret = PTR_ERR(priv->cm.rx_drain_qp);
+               return ret;
+       }
+
+       /*
+        * We put the QP in error state directly.  This way, a "flush
+        * error" WC will be immediately generated for each WR we post.
+        */
+       ret = ib_modify_qp(priv->cm.rx_drain_qp, &ipoib_cm_err_attr, IB_QP_STATE);
+       if (ret) {
+               ipoib_warn(priv, "failed to modify drain QP to error: %d\n", ret);
+               goto err_qp;
+       }
+
        priv->cm.id = ib_create_cm_id(priv->ca, ipoib_cm_rx_handler, dev);
        if (IS_ERR(priv->cm.id)) {
                printk(KERN_WARNING "%s: failed to create CM ID\n", priv->ca->name);
-               return IS_ERR(priv->cm.id);
+               ret = PTR_ERR(priv->cm.id);
+               goto err_cm;
        }
 
        ret = ib_cm_listen(priv->cm.id, cpu_to_be64(IPOIB_CM_IETF_ID | priv->qp->qp_num),
@@ -602,34 +667,79 @@ int ipoib_cm_dev_open(struct net_device *dev)
        if (ret) {
                printk(KERN_WARNING "%s: failed to listen on ID 0x%llx\n", priv->ca->name,
                       IPOIB_CM_IETF_ID | priv->qp->qp_num);
-               ib_destroy_cm_id(priv->cm.id);
-               return ret;
+               goto err_listen;
        }
+
        return 0;
+
+err_listen:
+       ib_destroy_cm_id(priv->cm.id);
+err_cm:
+       priv->cm.id = NULL;
+err_qp:
+       ib_destroy_qp(priv->cm.rx_drain_qp);
+       return ret;
 }
 
 void ipoib_cm_dev_stop(struct net_device *dev)
 {
        struct ipoib_dev_priv *priv = netdev_priv(dev);
-       struct ipoib_cm_rx *p;
-       unsigned long flags;
+       struct ipoib_cm_rx *p, *n;
+       unsigned long begin;
+       LIST_HEAD(list);
+       int ret;
 
-       if (!IPOIB_CM_SUPPORTED(dev->dev_addr))
+       if (!IPOIB_CM_SUPPORTED(dev->dev_addr) || !priv->cm.id)
                return;
 
        ib_destroy_cm_id(priv->cm.id);
-       spin_lock_irqsave(&priv->lock, flags);
+       priv->cm.id = NULL;
+
+       spin_lock_irq(&priv->lock);
        while (!list_empty(&priv->cm.passive_ids)) {
                p = list_entry(priv->cm.passive_ids.next, typeof(*p), list);
-               list_del_init(&p->list);
-               spin_unlock_irqrestore(&priv->lock, flags);
+               list_move(&p->list, &priv->cm.rx_error_list);
+               p->state = IPOIB_CM_RX_ERROR;
+               spin_unlock_irq(&priv->lock);
+               ret = ib_modify_qp(p->qp, &ipoib_cm_err_attr, IB_QP_STATE);
+               if (ret)
+                       ipoib_warn(priv, "unable to move qp to error state: %d\n", ret);
+               spin_lock_irq(&priv->lock);
+       }
+
+       /* Wait for all RX to be drained */
+       begin = jiffies;
+
+       while (!list_empty(&priv->cm.rx_error_list) ||
+              !list_empty(&priv->cm.rx_flush_list) ||
+              !list_empty(&priv->cm.rx_drain_list)) {
+               if (!time_after(jiffies, begin + 5 * HZ)) {
+                       ipoib_warn(priv, "RX drain timing out\n");
+
+                       /*
+                        * assume the HW is wedged and just free up everything.
+                        */
+                       list_splice_init(&priv->cm.rx_flush_list, &list);
+                       list_splice_init(&priv->cm.rx_error_list, &list);
+                       list_splice_init(&priv->cm.rx_drain_list, &list);
+                       break;
+               }
+               spin_unlock_irq(&priv->lock);
+               msleep(1);
+               spin_lock_irq(&priv->lock);
+       }
+
+       list_splice_init(&priv->cm.rx_reap_list, &list);
+
+       spin_unlock_irq(&priv->lock);
+
+       list_for_each_entry_safe(p, n, &list, list) {
                ib_destroy_cm_id(p->id);
                ib_destroy_qp(p->qp);
                kfree(p);
-               spin_lock_irqsave(&priv->lock, flags);
        }
-       spin_unlock_irqrestore(&priv->lock, flags);
 
+       ib_destroy_qp(priv->cm.rx_drain_qp);
        cancel_delayed_work(&priv->cm.stale_task);
 }
 
@@ -642,7 +752,6 @@ static int ipoib_cm_rep_handler(struct ib_cm_id *cm_id, struct ib_cm_event *even
        struct ib_qp_attr qp_attr;
        int qp_attr_mask, ret;
        struct sk_buff *skb;
-       unsigned long flags;
 
        p->mtu = be32_to_cpu(data->mtu);
 
@@ -680,12 +789,12 @@ static int ipoib_cm_rep_handler(struct ib_cm_id *cm_id, struct ib_cm_event *even
 
        skb_queue_head_init(&skqueue);
 
-       spin_lock_irqsave(&priv->lock, flags);
+       spin_lock_irq(&priv->lock);
        set_bit(IPOIB_FLAG_OPER_UP, &p->flags);
        if (p->neigh)
                while ((skb = __skb_dequeue(&p->neigh->queue)))
                        __skb_queue_tail(&skqueue, skb);
-       spin_unlock_irqrestore(&priv->lock, flags);
+       spin_unlock_irq(&priv->lock);
 
        while ((skb = __skb_dequeue(&skqueue))) {
                skb->dev = p->dev;
@@ -793,7 +902,7 @@ static int ipoib_cm_tx_init(struct ipoib_cm_tx *p, u32 qpn,
        }
 
        p->cq = ib_create_cq(priv->ca, ipoib_cm_tx_completion, NULL, p,
-                            ipoib_sendq_size + 1);
+                            ipoib_sendq_size + 1, 0);
        if (IS_ERR(p->cq)) {
                ret = PTR_ERR(p->cq);
                ipoib_warn(priv, "failed to allocate tx cq: %d\n", ret);
@@ -895,7 +1004,6 @@ static int ipoib_cm_tx_handler(struct ib_cm_id *cm_id,
        struct ipoib_dev_priv *priv = netdev_priv(tx->dev);
        struct net_device *dev = priv->dev;
        struct ipoib_neigh *neigh;
-       unsigned long flags;
        int ret;
 
        switch (event->event) {
@@ -914,7 +1022,7 @@ static int ipoib_cm_tx_handler(struct ib_cm_id *cm_id,
        case IB_CM_REJ_RECEIVED:
        case IB_CM_TIMEWAIT_EXIT:
                ipoib_dbg(priv, "CM error %d.\n", event->event);
-               spin_lock_irqsave(&priv->tx_lock, flags);
+               spin_lock_irq(&priv->tx_lock);
                spin_lock(&priv->lock);
                neigh = tx->neigh;
 
@@ -934,7 +1042,7 @@ static int ipoib_cm_tx_handler(struct ib_cm_id *cm_id,
                }
 
                spin_unlock(&priv->lock);
-               spin_unlock_irqrestore(&priv->tx_lock, flags);
+               spin_unlock_irq(&priv->tx_lock);
                break;
        default:
                break;
@@ -1023,21 +1131,20 @@ static void ipoib_cm_tx_reap(struct work_struct *work)
        struct ipoib_dev_priv *priv = container_of(work, struct ipoib_dev_priv,
                                                   cm.reap_task);
        struct ipoib_cm_tx *p;
-       unsigned long flags;
 
-       spin_lock_irqsave(&priv->tx_lock, flags);
+       spin_lock_irq(&priv->tx_lock);
        spin_lock(&priv->lock);
        while (!list_empty(&priv->cm.reap_list)) {
                p = list_entry(priv->cm.reap_list.next, typeof(*p), list);
                list_del(&p->list);
                spin_unlock(&priv->lock);
-               spin_unlock_irqrestore(&priv->tx_lock, flags);
+               spin_unlock_irq(&priv->tx_lock);
                ipoib_cm_tx_destroy(p);
-               spin_lock_irqsave(&priv->tx_lock, flags);
+               spin_lock_irq(&priv->tx_lock);
                spin_lock(&priv->lock);
        }
        spin_unlock(&priv->lock);
-       spin_unlock_irqrestore(&priv->tx_lock, flags);
+       spin_unlock_irq(&priv->tx_lock);
 }
 
 static void ipoib_cm_skb_reap(struct work_struct *work)
@@ -1046,15 +1153,14 @@ static void ipoib_cm_skb_reap(struct work_struct *work)
                                                   cm.skb_task);
        struct net_device *dev = priv->dev;
        struct sk_buff *skb;
-       unsigned long flags;
 
        unsigned mtu = priv->mcast_mtu;
 
-       spin_lock_irqsave(&priv->tx_lock, flags);
+       spin_lock_irq(&priv->tx_lock);
        spin_lock(&priv->lock);
        while ((skb = skb_dequeue(&priv->cm.skb_queue))) {
                spin_unlock(&priv->lock);
-               spin_unlock_irqrestore(&priv->tx_lock, flags);
+               spin_unlock_irq(&priv->tx_lock);
                if (skb->protocol == htons(ETH_P_IP))
                        icmp_send(skb, ICMP_DEST_UNREACH, ICMP_FRAG_NEEDED, htonl(mtu));
 #if defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE)
@@ -1062,11 +1168,11 @@ static void ipoib_cm_skb_reap(struct work_struct *work)
                        icmpv6_send(skb, ICMPV6_PKT_TOOBIG, 0, mtu, dev);
 #endif
                dev_kfree_skb_any(skb);
-               spin_lock_irqsave(&priv->tx_lock, flags);
+               spin_lock_irq(&priv->tx_lock);
                spin_lock(&priv->lock);
        }
        spin_unlock(&priv->lock);
-       spin_unlock_irqrestore(&priv->tx_lock, flags);
+       spin_unlock_irq(&priv->tx_lock);
 }
 
 void ipoib_cm_skb_too_long(struct net_device* dev, struct sk_buff *skb,
@@ -1083,28 +1189,51 @@ void ipoib_cm_skb_too_long(struct net_device* dev, struct sk_buff *skb,
                queue_work(ipoib_workqueue, &priv->cm.skb_task);
 }
 
+static void ipoib_cm_rx_reap(struct work_struct *work)
+{
+       struct ipoib_dev_priv *priv = container_of(work, struct ipoib_dev_priv,
+                                                  cm.rx_reap_task);
+       struct ipoib_cm_rx *p, *n;
+       LIST_HEAD(list);
+
+       spin_lock_irq(&priv->lock);
+       list_splice_init(&priv->cm.rx_reap_list, &list);
+       spin_unlock_irq(&priv->lock);
+
+       list_for_each_entry_safe(p, n, &list, list) {
+               ib_destroy_cm_id(p->id);
+               ib_destroy_qp(p->qp);
+               kfree(p);
+       }
+}
+
 static void ipoib_cm_stale_task(struct work_struct *work)
 {
        struct ipoib_dev_priv *priv = container_of(work, struct ipoib_dev_priv,
                                                   cm.stale_task.work);
        struct ipoib_cm_rx *p;
-       unsigned long flags;
+       int ret;
 
-       spin_lock_irqsave(&priv->lock, flags);
+       spin_lock_irq(&priv->lock);
        while (!list_empty(&priv->cm.passive_ids)) {
-               /* List if sorted by LRU, start from tail,
+               /* List is sorted by LRU, start from tail,
                 * stop when we see a recently used entry */
                p = list_entry(priv->cm.passive_ids.prev, typeof(*p), list);
                if (time_before_eq(jiffies, p->jiffies + IPOIB_CM_RX_TIMEOUT))
                        break;
-               list_del_init(&p->list);
-               spin_unlock_irqrestore(&priv->lock, flags);
-               ib_destroy_cm_id(p->id);
-               ib_destroy_qp(p->qp);
-               kfree(p);
-               spin_lock_irqsave(&priv->lock, flags);
+               list_move(&p->list, &priv->cm.rx_error_list);
+               p->state = IPOIB_CM_RX_ERROR;
+               spin_unlock_irq(&priv->lock);
+               ret = ib_modify_qp(p->qp, &ipoib_cm_err_attr, IB_QP_STATE);
+               if (ret)
+                       ipoib_warn(priv, "unable to move qp to error state: %d\n", ret);
+               spin_lock_irq(&priv->lock);
        }
-       spin_unlock_irqrestore(&priv->lock, flags);
+
+       if (!list_empty(&priv->cm.passive_ids))
+               queue_delayed_work(ipoib_workqueue,
+                                  &priv->cm.stale_task, IPOIB_CM_RX_DELAY);
+       spin_unlock_irq(&priv->lock);
 }
 
 
@@ -1165,9 +1294,14 @@ int ipoib_cm_dev_init(struct net_device *dev)
        INIT_LIST_HEAD(&priv->cm.passive_ids);
        INIT_LIST_HEAD(&priv->cm.reap_list);
        INIT_LIST_HEAD(&priv->cm.start_list);
+       INIT_LIST_HEAD(&priv->cm.rx_error_list);
+       INIT_LIST_HEAD(&priv->cm.rx_flush_list);
+       INIT_LIST_HEAD(&priv->cm.rx_drain_list);
+       INIT_LIST_HEAD(&priv->cm.rx_reap_list);
        INIT_WORK(&priv->cm.start_task, ipoib_cm_tx_start);
        INIT_WORK(&priv->cm.reap_task, ipoib_cm_tx_reap);
        INIT_WORK(&priv->cm.skb_task, ipoib_cm_skb_reap);
+       INIT_WORK(&priv->cm.rx_reap_task, ipoib_cm_rx_reap);
        INIT_DELAYED_WORK(&priv->cm.stale_task, ipoib_cm_stale_task);
 
        skb_queue_head_init(&priv->cm.skb_queue);