]> err.no Git - linux-2.6/blobdiff - fs/ocfs2/dlm/dlmmaster.c
ocfs2: wait for recovery when starting lock mastery
[linux-2.6] / fs / ocfs2 / dlm / dlmmaster.c
index 1b28d3eedac7a8164f87653e5681daaa1e16390d..ed1601d1831cf41cae0a9df6758811b5294068c9 100644 (file)
@@ -371,9 +371,14 @@ static void __dlm_put_mle(struct dlm_master_list_entry *mle)
 
        assert_spin_locked(&dlm->spinlock);
        assert_spin_locked(&dlm->master_lock);
-       BUG_ON(!atomic_read(&mle->mle_refs.refcount));
-
-       kref_put(&mle->mle_refs, dlm_mle_release);
+       if (!atomic_read(&mle->mle_refs.refcount)) {
+               /* this may or may not crash, but who cares.
+                * it's a BUG. */
+               mlog(ML_ERROR, "bad mle: %p\n", mle);
+               dlm_print_one_mle(mle);
+               BUG();
+       } else
+               kref_put(&mle->mle_refs, dlm_mle_release);
 }
 
 
@@ -614,6 +619,28 @@ static void dlm_lockres_release(struct kref *kref)
        mlog(0, "destroying lockres %.*s\n", res->lockname.len,
             res->lockname.name);
 
+       if (!hlist_unhashed(&res->hash_node) ||
+           !list_empty(&res->granted) ||
+           !list_empty(&res->converting) ||
+           !list_empty(&res->blocked) ||
+           !list_empty(&res->dirty) ||
+           !list_empty(&res->recovering) ||
+           !list_empty(&res->purge)) {
+               mlog(ML_ERROR,
+                    "Going to BUG for resource %.*s."
+                    "  We're on a list! [%c%c%c%c%c%c%c]\n",
+                    res->lockname.len, res->lockname.name,
+                    !hlist_unhashed(&res->hash_node) ? 'H' : ' ',
+                    !list_empty(&res->granted) ? 'G' : ' ',
+                    !list_empty(&res->converting) ? 'C' : ' ',
+                    !list_empty(&res->blocked) ? 'B' : ' ',
+                    !list_empty(&res->dirty) ? 'D' : ' ',
+                    !list_empty(&res->recovering) ? 'R' : ' ',
+                    !list_empty(&res->purge) ? 'P' : ' ');
+
+               dlm_print_one_lock_resource(res);
+       }
+
        /* By the time we're ready to blow this guy away, we shouldn't
         * be on any lists. */
        BUG_ON(!hlist_unhashed(&res->hash_node));
@@ -840,6 +867,7 @@ lookup:
        spin_unlock(&dlm->master_lock);
        spin_unlock(&dlm->spinlock);
 
+redo_request:
        while (wait_on_recovery) {
                /* any cluster changes that occurred after dropping the
                 * dlm spinlock would be detectable be a change on the mle,
@@ -858,7 +886,7 @@ lookup:
                } 
 
                dlm_kick_recovery_thread(dlm);
-               msleep(100);
+               msleep(1000);
                dlm_wait_for_recovery(dlm);
 
                spin_lock(&dlm->spinlock);
@@ -871,13 +899,15 @@ lookup:
                } else
                        wait_on_recovery = 0;
                spin_unlock(&dlm->spinlock);
+
+               if (wait_on_recovery)
+                       dlm_wait_for_node_recovery(dlm, bit, 10000);
        }
 
        /* must wait for lock to be mastered elsewhere */
        if (blocked)
                goto wait;
 
-redo_request:
        ret = -EINVAL;
        dlm_node_iter_init(mle->vote_map, &iter);
        while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
@@ -902,6 +932,7 @@ wait:
        /* keep going until the response map includes all nodes */
        ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
        if (ret < 0) {
+               wait_on_recovery = 1;
                mlog(0, "%s:%.*s: node map changed, redo the "
                     "master request now, blocked=%d\n",
                     dlm->name, res->lockname.len,
@@ -967,12 +998,14 @@ recheck:
                spin_unlock(&res->spinlock);
                /* this will cause the master to re-assert across
                 * the whole cluster, freeing up mles */
-               ret = dlm_do_master_request(mle, res->owner);
-               if (ret < 0) {
-                       /* give recovery a chance to run */
-                       mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
-                       msleep(500);
-                       goto recheck;
+               if (res->owner != dlm->node_num) {
+                       ret = dlm_do_master_request(mle, res->owner);
+                       if (ret < 0) {
+                               /* give recovery a chance to run */
+                               mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
+                               msleep(500);
+                               goto recheck;
+                       }
                }
                ret = 0;
                goto leave;
@@ -1008,6 +1041,12 @@ recheck:
                     "rechecking now\n", dlm->name, res->lockname.len,
                     res->lockname.name);
                goto recheck;
+       } else {
+               if (!voting_done) {
+                       mlog(0, "map not changed and voting not done "
+                            "for %s:%.*s\n", dlm->name, res->lockname.len,
+                            res->lockname.name);
+               }
        }
 
        if (m != O2NM_MAX_NODES) {
@@ -1175,18 +1214,6 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
                        set_bit(node, mle->vote_map);
                } else {
                        mlog(ML_ERROR, "node down! %d\n", node);
-
-                       /* if the node wasn't involved in mastery skip it,
-                        * but clear it out from the maps so that it will
-                        * not affect mastery of this lockres */
-                       clear_bit(node, mle->response_map);
-                       clear_bit(node, mle->vote_map);
-                       if (!test_bit(node, mle->maybe_map))
-                               goto next;
-
-                       /* if we're already blocked on lock mastery, and the
-                        * dead node wasn't the expected master, or there is
-                        * another node in the maybe_map, keep waiting */
                        if (blocked) {
                                int lowest = find_next_bit(mle->maybe_map,
                                                       O2NM_MAX_NODES, 0);
@@ -1194,54 +1221,53 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
                                /* act like it was never there */
                                clear_bit(node, mle->maybe_map);
 
-                               if (node != lowest)
-                                       goto next;
-
-                               mlog(ML_ERROR, "expected master %u died while "
-                                    "this node was blocked waiting on it!\n",
-                                    node);
-                               lowest = find_next_bit(mle->maybe_map,
-                                                      O2NM_MAX_NODES,
-                                                      lowest+1);
-                               if (lowest < O2NM_MAX_NODES) {
-                                       mlog(0, "still blocked. waiting "
-                                            "on %u now\n", lowest);
-                                       goto next;
+                               if (node == lowest) {
+                                       mlog(0, "expected master %u died"
+                                           " while this node was blocked "
+                                           "waiting on it!\n", node);
+                                       lowest = find_next_bit(mle->maybe_map,
+                                                       O2NM_MAX_NODES,
+                                                       lowest+1);
+                                       if (lowest < O2NM_MAX_NODES) {
+                                               mlog(0, "%s:%.*s:still "
+                                                    "blocked. waiting on %u "
+                                                    "now\n", dlm->name,
+                                                    res->lockname.len,
+                                                    res->lockname.name,
+                                                    lowest);
+                                       } else {
+                                               /* mle is an MLE_BLOCK, but
+                                                * there is now nothing left to
+                                                * block on.  we need to return
+                                                * all the way back out and try
+                                                * again with an MLE_MASTER.
+                                                * dlm_do_local_recovery_cleanup
+                                                * has already run, so the mle
+                                                * refcount is ok */
+                                               mlog(0, "%s:%.*s: no "
+                                                    "longer blocking. try to "
+                                                    "master this here\n",
+                                                    dlm->name,
+                                                    res->lockname.len,
+                                                    res->lockname.name);
+                                               mle->type = DLM_MLE_MASTER;
+                                               mle->u.res = res;
+                                       }
                                }
-
-                               /* mle is an MLE_BLOCK, but there is now
-                                * nothing left to block on.  we need to return
-                                * all the way back out and try again with
-                                * an MLE_MASTER. dlm_do_local_recovery_cleanup
-                                * has already run, so the mle refcount is ok */
-                               mlog(0, "no longer blocking. we can "
-                                    "try to master this here\n");
-                               mle->type = DLM_MLE_MASTER;
-                               memset(mle->maybe_map, 0,
-                                      sizeof(mle->maybe_map));
-                               memset(mle->response_map, 0,
-                                      sizeof(mle->maybe_map));
-                               memcpy(mle->vote_map, mle->node_map,
-                                      sizeof(mle->node_map));
-                               mle->u.res = res;
-                               set_bit(dlm->node_num, mle->maybe_map);
-
-                               ret = -EAGAIN;
-                               goto next;
                        }
 
-                       clear_bit(node, mle->maybe_map);
-                       if (node > dlm->node_num)
-                               goto next;
-
-                       mlog(0, "dead node in map!\n");
-                       /* yuck. go back and re-contact all nodes
-                        * in the vote_map, removing this node. */
-                       memset(mle->response_map, 0,
-                              sizeof(mle->response_map));
+                       /* now blank out everything, as if we had never
+                        * contacted anyone */
+                       memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
+                       memset(mle->response_map, 0, sizeof(mle->response_map));
+                       /* reset the vote_map to the current node_map */
+                       memcpy(mle->vote_map, mle->node_map,
+                              sizeof(mle->node_map));
+                       /* put myself into the maybe map */
+                       if (mle->type != DLM_MLE_BLOCK)
+                               set_bit(dlm->node_num, mle->maybe_map);
                }
                ret = -EAGAIN;
-next:
                node = dlm_bitmap_diff_iter_next(&bdi, &sc);
        }
        return ret;
@@ -1600,6 +1626,8 @@ again:
        dlm_node_iter_init(nodemap, &iter);
        while ((to = dlm_node_iter_next(&iter)) >= 0) {
                int r = 0;
+               struct dlm_master_list_entry *mle = NULL;
+
                mlog(0, "sending assert master to %d (%.*s)\n", to,
                     namelen, lockname);
                memset(&assert, 0, sizeof(assert));
@@ -1624,7 +1652,15 @@ again:
                        /* ok, something horribly messed.  kill thyself. */
                        mlog(ML_ERROR,"during assert master of %.*s to %u, "
                             "got %d.\n", namelen, lockname, to, r);
-                       dlm_dump_lock_resources(dlm);
+                       spin_lock(&dlm->spinlock);
+                       spin_lock(&dlm->master_lock);
+                       if (dlm_find_mle(dlm, &mle, (char *)lockname,
+                                        namelen)) {
+                               dlm_print_one_mle(mle);
+                               __dlm_put_mle(mle);
+                       }
+                       spin_unlock(&dlm->master_lock);
+                       spin_unlock(&dlm->spinlock);
                        BUG();
                } else if (r == EAGAIN) {
                        mlog(0, "%.*s: node %u create mles on other "
@@ -1691,7 +1727,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
                if (bit >= O2NM_MAX_NODES) {
                        /* not necessarily an error, though less likely.
                         * could be master just re-asserting. */
-                       mlog(ML_ERROR, "no bits set in the maybe_map, but %u "
+                       mlog(0, "no bits set in the maybe_map, but %u "
                             "is asserting! (%.*s)\n", assert->node_idx,
                             namelen, name);
                } else if (bit != assert->node_idx) {
@@ -1703,7 +1739,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
                                 * number winning the mastery will respond
                                 * YES to mastery requests, but this node
                                 * had no way of knowing.  let it pass. */
-                               mlog(ML_ERROR, "%u is the lowest node, "
+                               mlog(0, "%u is the lowest node, "
                                     "%u is asserting. (%.*s)  %u must "
                                     "have begun after %u won.\n", bit,
                                     assert->node_idx, namelen, name, bit,
@@ -1889,12 +1925,12 @@ done:
 
 kill:
        /* kill the caller! */
+       mlog(ML_ERROR, "Bad message received from another node.  Dumping state "
+            "and killing the other node now!  This node is OK and can continue.\n");
+       __dlm_print_one_lock_resource(res);
        spin_unlock(&res->spinlock);
        spin_unlock(&dlm->spinlock);
        dlm_lockres_put(res);
-       mlog(ML_ERROR, "Bad message received from another node.  Dumping state "
-            "and killing the other node now!  This node is OK and can continue.\n");
-       dlm_dump_lock_resources(dlm);
        dlm_put(dlm);
        return -EINVAL;
 }
@@ -1967,6 +2003,23 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
                }
        }
 
+       /*
+        * If we're migrating this lock to someone else, we are no
+        * longer allowed to assert out own mastery.  OTOH, we need to
+        * prevent migration from starting while we're still asserting
+        * our dominance.  The reserved ast delays migration.
+        */
+       spin_lock(&res->spinlock);
+       if (res->state & DLM_LOCK_RES_MIGRATING) {
+               mlog(0, "Someone asked us to assert mastery, but we're "
+                    "in the middle of migration.  Skipping assert, "
+                    "the new master will handle that.\n");
+               spin_unlock(&res->spinlock);
+               goto put;
+       } else
+               __dlm_lockres_reserve_ast(res);
+       spin_unlock(&res->spinlock);
+
        /* this call now finishes out the nodemap
         * even if one or more nodes die */
        mlog(0, "worker about to master %.*s here, this=%u\n",
@@ -1979,6 +2032,10 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
                mlog_errno(ret);
        }
 
+       /* Ok, we've asserted ourselves.  Let's let migration start. */
+       dlm_lockres_release_ast(dlm, res);
+
+put:
        dlm_lockres_put(res);
 
        mlog(0, "finished with dlm_assert_master_worker\n");
@@ -2017,6 +2074,7 @@ static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
                                BUG();
                        /* host is down, so answer for that node would be
                         * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
+                       ret = 0;
                }
 
                if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
@@ -2268,8 +2326,8 @@ fail:
                        /* avoid hang during shutdown when migrating lockres 
                         * to a node which also goes down */
                        if (dlm_is_node_dead(dlm, target)) {
-                               mlog(0, "%s:%.*s: expected migration target %u "
-                                    "is no longer up.  restarting.\n",
+                               mlog(0, "%s:%.*s: expected migration "
+                                    "target %u is no longer up, restarting\n",
                                     dlm->name, res->lockname.len,
                                     res->lockname.name, target);
                                ret = -ERESTARTSYS;
@@ -2790,8 +2848,8 @@ top:
                spin_unlock(&mle->spinlock);
                wake_up(&mle->wq);
 
-               mlog(0, "node %u died during migration from "
-                    "%u to %u!\n", dead_node,
+               mlog(0, "%s: node %u died during migration from "
+                    "%u to %u!\n", dlm->name, dead_node,
                     mle->master, mle->new_master);
                /* if there is a lockres associated with this
                 * mle, find it and set its owner to UNKNOWN */