]> err.no Git - linux-2.6/blobdiff - fs/ocfs2/dlm/dlmrecovery.c
ocfs2: only recover one dead node at a time
[linux-2.6] / fs / ocfs2 / dlm / dlmrecovery.c
index 805cbabac051d5007c492ac5de04f4f81c84e832..39488763728936e706dfb952f07e3f6b5963c0e5 100644 (file)
@@ -115,12 +115,31 @@ static u64 dlm_get_next_mig_cookie(void)
        return c;
 }
 
+static inline void dlm_set_reco_dead_node(struct dlm_ctxt *dlm,
+                                         u8 dead_node)
+{
+       assert_spin_locked(&dlm->spinlock);
+       if (dlm->reco.dead_node != dead_node)
+               mlog(0, "%s: changing dead_node from %u to %u\n",
+                    dlm->name, dlm->reco.dead_node, dead_node);
+       dlm->reco.dead_node = dead_node;
+}
+
+static inline void dlm_set_reco_master(struct dlm_ctxt *dlm,
+                                      u8 master)
+{
+       assert_spin_locked(&dlm->spinlock);
+       mlog(0, "%s: changing new_master from %u to %u\n",
+            dlm->name, dlm->reco.new_master, master);
+       dlm->reco.new_master = master;
+}
+
 static inline void dlm_reset_recovery(struct dlm_ctxt *dlm)
 {
        spin_lock(&dlm->spinlock);
        clear_bit(dlm->reco.dead_node, dlm->recovery_map);
-       dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
-       dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
+       dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
+       dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
        spin_unlock(&dlm->spinlock);
 }
 
@@ -267,7 +286,7 @@ int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node)
 {
        int dead;
        spin_lock(&dlm->spinlock);
-       dead = test_bit(node, dlm->domain_map);
+       dead = !test_bit(node, dlm->domain_map);
        spin_unlock(&dlm->spinlock);
        return dead;
 }
@@ -341,7 +360,7 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
                mlog(0, "new master %u died while recovering %u!\n",
                     dlm->reco.new_master, dlm->reco.dead_node);
                /* unset the new_master, leave dead_node */
-               dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
+               dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
        }
 
        /* select a target to recover */
@@ -350,14 +369,14 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
 
                bit = find_next_bit (dlm->recovery_map, O2NM_MAX_NODES+1, 0);
                if (bit >= O2NM_MAX_NODES || bit < 0)
-                       dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
+                       dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
                else
-                       dlm->reco.dead_node = bit;
+                       dlm_set_reco_dead_node(dlm, bit);
        } else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) {
                /* BUG? */
                mlog(ML_ERROR, "dead_node %u no longer in recovery map!\n",
                     dlm->reco.dead_node);
-               dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
+               dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
        }
 
        if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
@@ -691,6 +710,14 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data)
        if (!dlm_grab(dlm))
                return -EINVAL;
 
+       if (lr->dead_node != dlm->reco.dead_node) {
+               mlog(ML_ERROR, "%s: node %u sent dead_node=%u, but local "
+                    "dead_node is %u\n", dlm->name, lr->node_idx,
+                    lr->dead_node, dlm->reco.dead_node);
+               /* this is a hack */
+               dlm_put(dlm);
+               return -ENOMEM;
+       }
        BUG_ON(lr->dead_node != dlm->reco.dead_node);
 
        item = kcalloc(1, sizeof(*item), GFP_KERNEL);
@@ -905,13 +932,11 @@ static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm,
                        mlog(0, "found lockres owned by dead node while "
                                  "doing recovery for node %u. sending it.\n",
                                  dead_node);
-                       list_del_init(&res->recovering);
-                       list_add_tail(&res->recovering, list);
+                       list_move_tail(&res->recovering, list);
                } else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
                        mlog(0, "found UNKNOWN owner while doing recovery "
                                  "for node %u. sending it.\n", dead_node);
-                       list_del_init(&res->recovering);
-                       list_add_tail(&res->recovering, list);
+                       list_move_tail(&res->recovering, list);
                }
        }
        spin_unlock(&dlm->spinlock);
@@ -1023,8 +1048,9 @@ static int dlm_add_lock_to_array(struct dlm_lock *lock,
                    ml->type == LKM_PRMODE) {
                        /* if it is already set, this had better be a PR
                         * and it has to match */
-                       if (mres->lvb[0] && (ml->type == LKM_EXMODE ||
-                           memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) {
+                       if (!dlm_lvb_is_empty(mres->lvb) &&
+                           (ml->type == LKM_EXMODE ||
+                            memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) {
                                mlog(ML_ERROR, "mismatched lvbs!\n");
                                __dlm_print_one_lock_resource(lock->lockres);
                                BUG();
@@ -1406,6 +1432,7 @@ int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data)
        struct dlm_ctxt *dlm = data;
        struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf;
        struct dlm_lock_resource *res = NULL;
+       unsigned int hash;
        int master = DLM_LOCK_RES_OWNER_UNKNOWN;
        u32 flags = DLM_ASSERT_MASTER_REQUERY;
 
@@ -1415,8 +1442,10 @@ int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data)
                return master;
        }
 
+       hash = dlm_lockid_hash(req->name, req->namelen);
+
        spin_lock(&dlm->spinlock);
-       res = __dlm_lookup_lockres(dlm, req->name, req->namelen);
+       res = __dlm_lookup_lockres(dlm, req->name, req->namelen, hash);
        if (res) {
                spin_lock(&res->spinlock);
                master = res->owner;
@@ -1483,7 +1512,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
        struct dlm_lock *newlock = NULL;
        struct dlm_lockstatus *lksb = NULL;
        int ret = 0;
-       int i;
+       int i, bad;
        struct list_head *iter;
        struct dlm_lock *lock = NULL;
 
@@ -1529,8 +1558,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
 
                        /* move the lock to its proper place */
                        /* do not alter lock refcount.  switching lists. */
-                       list_del_init(&lock->list);
-                       list_add_tail(&lock->list, queue);
+                       list_move_tail(&lock->list, queue);
                        spin_unlock(&res->spinlock);
 
                        mlog(0, "just reordered a local lock!\n");
@@ -1554,7 +1582,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
                lksb->flags |= (ml->flags &
                                (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
                        
-               if (mres->lvb[0]) {
+               if (!dlm_lvb_is_empty(mres->lvb)) {
                        if (lksb->flags & DLM_LKSB_PUT_LVB) {
                                /* other node was trying to update
                                 * lvb when node died.  recreate the
@@ -1565,8 +1593,9 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
                                 * most recent valid lvb info */
                                BUG_ON(ml->type != LKM_EXMODE &&
                                       ml->type != LKM_PRMODE);
-                               if (res->lvb[0] && (ml->type == LKM_EXMODE ||
-                                   memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {
+                               if (!dlm_lvb_is_empty(res->lvb) &&
+                                   (ml->type == LKM_EXMODE ||
+                                    memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {
                                        mlog(ML_ERROR, "received bad lvb!\n");
                                        __dlm_print_one_lock_resource(res);
                                        BUG();
@@ -1592,9 +1621,33 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
                 * relative to each other, but clearly *not*
                 * preserved relative to locks from other nodes.
                 */
+               bad = 0;
                spin_lock(&res->spinlock);
-               dlm_lock_get(newlock);
-               list_add_tail(&newlock->list, queue);
+               list_for_each_entry(lock, queue, list) {
+                       if (lock->ml.cookie == ml->cookie) {
+                               u64 c = lock->ml.cookie;
+                               mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already "
+                                    "exists on this lockres!\n", dlm->name,
+                                    res->lockname.len, res->lockname.name,
+                                    dlm_get_lock_cookie_node(c),
+                                    dlm_get_lock_cookie_seq(c));
+
+                               mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, "
+                                    "node=%u, cookie=%u:%llu, queue=%d\n",
+                                    ml->type, ml->convert_type, ml->node,
+                                    dlm_get_lock_cookie_node(ml->cookie),
+                                    dlm_get_lock_cookie_seq(ml->cookie),
+                                    ml->list);
+
+                               __dlm_print_one_lock_resource(res);
+                               bad = 1;
+                               break;
+                       }
+               }
+               if (!bad) {
+                       dlm_lock_get(newlock);
+                       list_add_tail(&newlock->list, queue);
+               }
                spin_unlock(&res->spinlock);
        }
        mlog(0, "done running all the locks\n");
@@ -1719,7 +1772,7 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
         * the RECOVERING state and set the owner
         * if necessary */
        for (i = 0; i < DLM_HASH_BUCKETS; i++) {
-               bucket = &(dlm->lockres_hash[i]);
+               bucket = dlm_lockres_hash(dlm, i);
                hlist_for_each_entry(res, hash_iter, bucket, hash_node) {
                        if (res->state & DLM_LOCK_RES_RECOVERING) {
                                if (res->owner == dead_node) {
@@ -1884,7 +1937,7 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
         *    need to be fired as a result.
         */
        for (i = 0; i < DLM_HASH_BUCKETS; i++) {
-               bucket = &(dlm->lockres_hash[i]);
+               bucket = dlm_lockres_hash(dlm, i);
                hlist_for_each_entry(res, iter, bucket, hash_node) {
                        /* always prune any $RECOVERY entries for dead nodes,
                         * otherwise hangs can occur during later recovery */
@@ -2087,7 +2140,7 @@ again:
 
                        /* set the new_master to this node */
                        spin_lock(&dlm->spinlock);
-                       dlm->reco.new_master = dlm->node_num;
+                       dlm_set_reco_master(dlm, dlm->node_num);
                        spin_unlock(&dlm->spinlock);
                }
 
@@ -2252,8 +2305,8 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
                     "node %u changing it to %u\n", dlm->name, 
                     dlm->reco.dead_node, br->node_idx, br->dead_node);
        }
-       dlm->reco.new_master = br->node_idx;
-       dlm->reco.dead_node = br->dead_node;
+       dlm_set_reco_master(dlm, br->node_idx);
+       dlm_set_reco_dead_node(dlm, br->dead_node);
        if (!test_bit(br->dead_node, dlm->recovery_map)) {
                mlog(0, "recovery master %u sees %u as dead, but this "
                     "node has not yet.  marking %u as dead\n",