X-Git-Url: https://err.no/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=fs%2Focfs2%2Fdlm%2Fdlmmaster.c;h=ed1601d1831cf41cae0a9df6758811b5294068c9;hb=b7084ab538ac2bd71ce494cf1cbbea9fe9db2c07;hp=1b28d3eedac7a8164f87653e5681daaa1e16390d;hpb=dc2ed195dda848c8e4b24f3f0e1952fa74f74f6b;p=linux-2.6 diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 1b28d3eeda..ed1601d183 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -371,9 +371,14 @@ static void __dlm_put_mle(struct dlm_master_list_entry *mle) assert_spin_locked(&dlm->spinlock); assert_spin_locked(&dlm->master_lock); - BUG_ON(!atomic_read(&mle->mle_refs.refcount)); - - kref_put(&mle->mle_refs, dlm_mle_release); + if (!atomic_read(&mle->mle_refs.refcount)) { + /* this may or may not crash, but who cares. + * it's a BUG. */ + mlog(ML_ERROR, "bad mle: %p\n", mle); + dlm_print_one_mle(mle); + BUG(); + } else + kref_put(&mle->mle_refs, dlm_mle_release); } @@ -614,6 +619,28 @@ static void dlm_lockres_release(struct kref *kref) mlog(0, "destroying lockres %.*s\n", res->lockname.len, res->lockname.name); + if (!hlist_unhashed(&res->hash_node) || + !list_empty(&res->granted) || + !list_empty(&res->converting) || + !list_empty(&res->blocked) || + !list_empty(&res->dirty) || + !list_empty(&res->recovering) || + !list_empty(&res->purge)) { + mlog(ML_ERROR, + "Going to BUG for resource %.*s." + " We're on a list! [%c%c%c%c%c%c%c]\n", + res->lockname.len, res->lockname.name, + !hlist_unhashed(&res->hash_node) ? 'H' : ' ', + !list_empty(&res->granted) ? 'G' : ' ', + !list_empty(&res->converting) ? 'C' : ' ', + !list_empty(&res->blocked) ? 'B' : ' ', + !list_empty(&res->dirty) ? 'D' : ' ', + !list_empty(&res->recovering) ? 'R' : ' ', + !list_empty(&res->purge) ? 'P' : ' '); + + dlm_print_one_lock_resource(res); + } + /* By the time we're ready to blow this guy away, we shouldn't * be on any lists. */ BUG_ON(!hlist_unhashed(&res->hash_node)); @@ -840,6 +867,7 @@ lookup: spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); +redo_request: while (wait_on_recovery) { /* any cluster changes that occurred after dropping the * dlm spinlock would be detectable be a change on the mle, @@ -858,7 +886,7 @@ lookup: } dlm_kick_recovery_thread(dlm); - msleep(100); + msleep(1000); dlm_wait_for_recovery(dlm); spin_lock(&dlm->spinlock); @@ -871,13 +899,15 @@ lookup: } else wait_on_recovery = 0; spin_unlock(&dlm->spinlock); + + if (wait_on_recovery) + dlm_wait_for_node_recovery(dlm, bit, 10000); } /* must wait for lock to be mastered elsewhere */ if (blocked) goto wait; -redo_request: ret = -EINVAL; dlm_node_iter_init(mle->vote_map, &iter); while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { @@ -902,6 +932,7 @@ wait: /* keep going until the response map includes all nodes */ ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked); if (ret < 0) { + wait_on_recovery = 1; mlog(0, "%s:%.*s: node map changed, redo the " "master request now, blocked=%d\n", dlm->name, res->lockname.len, @@ -967,12 +998,14 @@ recheck: spin_unlock(&res->spinlock); /* this will cause the master to re-assert across * the whole cluster, freeing up mles */ - ret = dlm_do_master_request(mle, res->owner); - if (ret < 0) { - /* give recovery a chance to run */ - mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret); - msleep(500); - goto recheck; + if (res->owner != dlm->node_num) { + ret = dlm_do_master_request(mle, res->owner); + if (ret < 0) { + /* give recovery a chance to run */ + mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret); + msleep(500); + goto recheck; + } } ret = 0; goto leave; @@ -1008,6 +1041,12 @@ recheck: "rechecking now\n", dlm->name, res->lockname.len, res->lockname.name); goto recheck; + } else { + if (!voting_done) { + mlog(0, "map not changed and voting not done " + "for %s:%.*s\n", dlm->name, res->lockname.len, + res->lockname.name); + } } if (m != O2NM_MAX_NODES) { @@ -1175,18 +1214,6 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm, set_bit(node, mle->vote_map); } else { mlog(ML_ERROR, "node down! %d\n", node); - - /* if the node wasn't involved in mastery skip it, - * but clear it out from the maps so that it will - * not affect mastery of this lockres */ - clear_bit(node, mle->response_map); - clear_bit(node, mle->vote_map); - if (!test_bit(node, mle->maybe_map)) - goto next; - - /* if we're already blocked on lock mastery, and the - * dead node wasn't the expected master, or there is - * another node in the maybe_map, keep waiting */ if (blocked) { int lowest = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0); @@ -1194,54 +1221,53 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm, /* act like it was never there */ clear_bit(node, mle->maybe_map); - if (node != lowest) - goto next; - - mlog(ML_ERROR, "expected master %u died while " - "this node was blocked waiting on it!\n", - node); - lowest = find_next_bit(mle->maybe_map, - O2NM_MAX_NODES, - lowest+1); - if (lowest < O2NM_MAX_NODES) { - mlog(0, "still blocked. waiting " - "on %u now\n", lowest); - goto next; + if (node == lowest) { + mlog(0, "expected master %u died" + " while this node was blocked " + "waiting on it!\n", node); + lowest = find_next_bit(mle->maybe_map, + O2NM_MAX_NODES, + lowest+1); + if (lowest < O2NM_MAX_NODES) { + mlog(0, "%s:%.*s:still " + "blocked. waiting on %u " + "now\n", dlm->name, + res->lockname.len, + res->lockname.name, + lowest); + } else { + /* mle is an MLE_BLOCK, but + * there is now nothing left to + * block on. we need to return + * all the way back out and try + * again with an MLE_MASTER. + * dlm_do_local_recovery_cleanup + * has already run, so the mle + * refcount is ok */ + mlog(0, "%s:%.*s: no " + "longer blocking. try to " + "master this here\n", + dlm->name, + res->lockname.len, + res->lockname.name); + mle->type = DLM_MLE_MASTER; + mle->u.res = res; + } } - - /* mle is an MLE_BLOCK, but there is now - * nothing left to block on. we need to return - * all the way back out and try again with - * an MLE_MASTER. dlm_do_local_recovery_cleanup - * has already run, so the mle refcount is ok */ - mlog(0, "no longer blocking. we can " - "try to master this here\n"); - mle->type = DLM_MLE_MASTER; - memset(mle->maybe_map, 0, - sizeof(mle->maybe_map)); - memset(mle->response_map, 0, - sizeof(mle->maybe_map)); - memcpy(mle->vote_map, mle->node_map, - sizeof(mle->node_map)); - mle->u.res = res; - set_bit(dlm->node_num, mle->maybe_map); - - ret = -EAGAIN; - goto next; } - clear_bit(node, mle->maybe_map); - if (node > dlm->node_num) - goto next; - - mlog(0, "dead node in map!\n"); - /* yuck. go back and re-contact all nodes - * in the vote_map, removing this node. */ - memset(mle->response_map, 0, - sizeof(mle->response_map)); + /* now blank out everything, as if we had never + * contacted anyone */ + memset(mle->maybe_map, 0, sizeof(mle->maybe_map)); + memset(mle->response_map, 0, sizeof(mle->response_map)); + /* reset the vote_map to the current node_map */ + memcpy(mle->vote_map, mle->node_map, + sizeof(mle->node_map)); + /* put myself into the maybe map */ + if (mle->type != DLM_MLE_BLOCK) + set_bit(dlm->node_num, mle->maybe_map); } ret = -EAGAIN; -next: node = dlm_bitmap_diff_iter_next(&bdi, &sc); } return ret; @@ -1600,6 +1626,8 @@ again: dlm_node_iter_init(nodemap, &iter); while ((to = dlm_node_iter_next(&iter)) >= 0) { int r = 0; + struct dlm_master_list_entry *mle = NULL; + mlog(0, "sending assert master to %d (%.*s)\n", to, namelen, lockname); memset(&assert, 0, sizeof(assert)); @@ -1624,7 +1652,15 @@ again: /* ok, something horribly messed. kill thyself. */ mlog(ML_ERROR,"during assert master of %.*s to %u, " "got %d.\n", namelen, lockname, to, r); - dlm_dump_lock_resources(dlm); + spin_lock(&dlm->spinlock); + spin_lock(&dlm->master_lock); + if (dlm_find_mle(dlm, &mle, (char *)lockname, + namelen)) { + dlm_print_one_mle(mle); + __dlm_put_mle(mle); + } + spin_unlock(&dlm->master_lock); + spin_unlock(&dlm->spinlock); BUG(); } else if (r == EAGAIN) { mlog(0, "%.*s: node %u create mles on other " @@ -1691,7 +1727,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) if (bit >= O2NM_MAX_NODES) { /* not necessarily an error, though less likely. * could be master just re-asserting. */ - mlog(ML_ERROR, "no bits set in the maybe_map, but %u " + mlog(0, "no bits set in the maybe_map, but %u " "is asserting! (%.*s)\n", assert->node_idx, namelen, name); } else if (bit != assert->node_idx) { @@ -1703,7 +1739,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) * number winning the mastery will respond * YES to mastery requests, but this node * had no way of knowing. let it pass. */ - mlog(ML_ERROR, "%u is the lowest node, " + mlog(0, "%u is the lowest node, " "%u is asserting. (%.*s) %u must " "have begun after %u won.\n", bit, assert->node_idx, namelen, name, bit, @@ -1889,12 +1925,12 @@ done: kill: /* kill the caller! */ + mlog(ML_ERROR, "Bad message received from another node. Dumping state " + "and killing the other node now! This node is OK and can continue.\n"); + __dlm_print_one_lock_resource(res); spin_unlock(&res->spinlock); spin_unlock(&dlm->spinlock); dlm_lockres_put(res); - mlog(ML_ERROR, "Bad message received from another node. Dumping state " - "and killing the other node now! This node is OK and can continue.\n"); - dlm_dump_lock_resources(dlm); dlm_put(dlm); return -EINVAL; } @@ -1967,6 +2003,23 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data) } } + /* + * If we're migrating this lock to someone else, we are no + * longer allowed to assert out own mastery. OTOH, we need to + * prevent migration from starting while we're still asserting + * our dominance. The reserved ast delays migration. + */ + spin_lock(&res->spinlock); + if (res->state & DLM_LOCK_RES_MIGRATING) { + mlog(0, "Someone asked us to assert mastery, but we're " + "in the middle of migration. Skipping assert, " + "the new master will handle that.\n"); + spin_unlock(&res->spinlock); + goto put; + } else + __dlm_lockres_reserve_ast(res); + spin_unlock(&res->spinlock); + /* this call now finishes out the nodemap * even if one or more nodes die */ mlog(0, "worker about to master %.*s here, this=%u\n", @@ -1979,6 +2032,10 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data) mlog_errno(ret); } + /* Ok, we've asserted ourselves. Let's let migration start. */ + dlm_lockres_release_ast(dlm, res); + +put: dlm_lockres_put(res); mlog(0, "finished with dlm_assert_master_worker\n"); @@ -2017,6 +2074,7 @@ static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm, BUG(); /* host is down, so answer for that node would be * DLM_LOCK_RES_OWNER_UNKNOWN. continue. */ + ret = 0; } if (master != DLM_LOCK_RES_OWNER_UNKNOWN) { @@ -2268,8 +2326,8 @@ fail: /* avoid hang during shutdown when migrating lockres * to a node which also goes down */ if (dlm_is_node_dead(dlm, target)) { - mlog(0, "%s:%.*s: expected migration target %u " - "is no longer up. restarting.\n", + mlog(0, "%s:%.*s: expected migration " + "target %u is no longer up, restarting\n", dlm->name, res->lockname.len, res->lockname.name, target); ret = -ERESTARTSYS; @@ -2790,8 +2848,8 @@ top: spin_unlock(&mle->spinlock); wake_up(&mle->wq); - mlog(0, "node %u died during migration from " - "%u to %u!\n", dead_node, + mlog(0, "%s: node %u died during migration from " + "%u to %u!\n", dlm->name, dead_node, mle->master, mle->new_master); /* if there is a lockres associated with this * mle, find it and set its owner to UNKNOWN */