X-Git-Url: https://err.no/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=fs%2Focfs2%2Fdlm%2Fdlmmaster.c;h=ed1601d1831cf41cae0a9df6758811b5294068c9;hb=b7084ab538ac2bd71ce494cf1cbbea9fe9db2c07;hp=940be4c13b1f09ff4703662f007692a6d4b81e89;hpb=74d89c16735d83349ea74232031819e989a49156;p=linux-2.6 diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 940be4c13b..ed1601d183 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -74,6 +74,7 @@ struct dlm_master_list_entry wait_queue_head_t wq; atomic_t woken; struct kref mle_refs; + int inuse; unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; @@ -130,15 +131,30 @@ static inline int dlm_mle_equal(struct dlm_ctxt *dlm, #if 0 /* Code here is included but defined out as it aids debugging */ +#define dlm_print_nodemap(m) _dlm_print_nodemap(m,#m) +void _dlm_print_nodemap(unsigned long *map, const char *mapname) +{ + int i; + printk("%s=[ ", mapname); + for (i=0; imaybe_map, + *vote = mle->vote_map, + *resp = mle->response_map, + *node = mle->node_map; k = &mle->mle_refs; if (mle->type == DLM_MLE_BLOCK) @@ -159,9 +175,18 @@ void dlm_print_one_mle(struct dlm_master_list_entry *mle) name = mle->u.res->lockname.name; } - mlog(ML_NOTICE, " #%3d: %3s %3d %3u %3u %c (%d)%.*s\n", - i, type, refs, master, mle->new_master, attached, - namelen, namelen, name); + mlog(ML_NOTICE, "%.*s: %3s refs=%3d mas=%3u new=%3u evt=%c inuse=%d ", + namelen, name, type, refs, master, mle->new_master, attached, + mle->inuse); + dlm_print_nodemap(maybe); + printk(", "); + dlm_print_nodemap(vote); + printk(", "); + dlm_print_nodemap(resp); + printk(", "); + dlm_print_nodemap(node); + printk(", "); + printk("\n"); } static void dlm_dump_mles(struct dlm_ctxt *dlm) @@ -170,7 +195,6 @@ static void dlm_dump_mles(struct dlm_ctxt *dlm) struct list_head *iter; mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name); - mlog(ML_NOTICE, " ####: type refs owner new events? lockname nodemap votemap respmap maybemap\n"); spin_lock(&dlm->master_lock); list_for_each(iter, &dlm->master_list) { mle = list_entry(iter, struct dlm_master_list_entry, list); @@ -314,6 +338,31 @@ static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm, spin_unlock(&dlm->spinlock); } +static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle) +{ + struct dlm_ctxt *dlm; + dlm = mle->dlm; + + assert_spin_locked(&dlm->spinlock); + assert_spin_locked(&dlm->master_lock); + mle->inuse++; + kref_get(&mle->mle_refs); +} + +static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle) +{ + struct dlm_ctxt *dlm; + dlm = mle->dlm; + + spin_lock(&dlm->spinlock); + spin_lock(&dlm->master_lock); + mle->inuse--; + __dlm_put_mle(mle); + spin_unlock(&dlm->master_lock); + spin_unlock(&dlm->spinlock); + +} + /* remove from list and free */ static void __dlm_put_mle(struct dlm_master_list_entry *mle) { @@ -322,9 +371,14 @@ static void __dlm_put_mle(struct dlm_master_list_entry *mle) assert_spin_locked(&dlm->spinlock); assert_spin_locked(&dlm->master_lock); - BUG_ON(!atomic_read(&mle->mle_refs.refcount)); - - kref_put(&mle->mle_refs, dlm_mle_release); + if (!atomic_read(&mle->mle_refs.refcount)) { + /* this may or may not crash, but who cares. + * it's a BUG. */ + mlog(ML_ERROR, "bad mle: %p\n", mle); + dlm_print_one_mle(mle); + BUG(); + } else + kref_put(&mle->mle_refs, dlm_mle_release); } @@ -367,6 +421,7 @@ static void dlm_init_mle(struct dlm_master_list_entry *mle, memset(mle->response_map, 0, sizeof(mle->response_map)); mle->master = O2NM_MAX_NODES; mle->new_master = O2NM_MAX_NODES; + mle->inuse = 0; if (mle->type == DLM_MLE_MASTER) { BUG_ON(!res); @@ -564,6 +619,28 @@ static void dlm_lockres_release(struct kref *kref) mlog(0, "destroying lockres %.*s\n", res->lockname.len, res->lockname.name); + if (!hlist_unhashed(&res->hash_node) || + !list_empty(&res->granted) || + !list_empty(&res->converting) || + !list_empty(&res->blocked) || + !list_empty(&res->dirty) || + !list_empty(&res->recovering) || + !list_empty(&res->purge)) { + mlog(ML_ERROR, + "Going to BUG for resource %.*s." + " We're on a list! [%c%c%c%c%c%c%c]\n", + res->lockname.len, res->lockname.name, + !hlist_unhashed(&res->hash_node) ? 'H' : ' ', + !list_empty(&res->granted) ? 'G' : ' ', + !list_empty(&res->converting) ? 'C' : ' ', + !list_empty(&res->blocked) ? 'B' : ' ', + !list_empty(&res->dirty) ? 'D' : ' ', + !list_empty(&res->recovering) ? 'R' : ' ', + !list_empty(&res->purge) ? 'P' : ' '); + + dlm_print_one_lock_resource(res); + } + /* By the time we're ready to blow this guy away, we shouldn't * be on any lists. */ BUG_ON(!hlist_unhashed(&res->hash_node)); @@ -579,11 +656,6 @@ static void dlm_lockres_release(struct kref *kref) kfree(res); } -void dlm_lockres_get(struct dlm_lock_resource *res) -{ - kref_get(&res->refs); -} - void dlm_lockres_put(struct dlm_lock_resource *res) { kref_put(&res->refs, dlm_lockres_release); @@ -603,7 +675,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm, memcpy(qname, name, namelen); res->lockname.len = namelen; - res->lockname.hash = full_name_hash(name, namelen); + res->lockname.hash = dlm_lockid_hash(name, namelen); init_waitqueue_head(&res->wq); spin_lock_init(&res->spinlock); @@ -677,19 +749,20 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm, int blocked = 0; int ret, nodenum; struct dlm_node_iter iter; - unsigned int namelen; + unsigned int namelen, hash; int tries = 0; int bit, wait_on_recovery = 0; BUG_ON(!lockid); namelen = strlen(lockid); + hash = dlm_lockid_hash(lockid, namelen); mlog(0, "get lockres %s (len %d)\n", lockid, namelen); lookup: spin_lock(&dlm->spinlock); - tmpres = __dlm_lookup_lockres(dlm, lockid, namelen); + tmpres = __dlm_lookup_lockres(dlm, lockid, namelen, hash); if (tmpres) { spin_unlock(&dlm->spinlock); mlog(0, "found in hash!\n"); @@ -790,10 +863,11 @@ lookup: * if so, the creator of the BLOCK may try to put the last * ref at this time in the assert master handler, so we * need an extra one to keep from a bad ptr deref. */ - dlm_get_mle(mle); + dlm_get_mle_inuse(mle); spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); +redo_request: while (wait_on_recovery) { /* any cluster changes that occurred after dropping the * dlm spinlock would be detectable be a change on the mle, @@ -812,7 +886,7 @@ lookup: } dlm_kick_recovery_thread(dlm); - msleep(100); + msleep(1000); dlm_wait_for_recovery(dlm); spin_lock(&dlm->spinlock); @@ -825,13 +899,15 @@ lookup: } else wait_on_recovery = 0; spin_unlock(&dlm->spinlock); + + if (wait_on_recovery) + dlm_wait_for_node_recovery(dlm, bit, 10000); } /* must wait for lock to be mastered elsewhere */ if (blocked) goto wait; -redo_request: ret = -EINVAL; dlm_node_iter_init(mle->vote_map, &iter); while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { @@ -856,6 +932,7 @@ wait: /* keep going until the response map includes all nodes */ ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked); if (ret < 0) { + wait_on_recovery = 1; mlog(0, "%s:%.*s: node map changed, redo the " "master request now, blocked=%d\n", dlm->name, res->lockname.len, @@ -880,7 +957,7 @@ wait: dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); /* put the extra ref */ - dlm_put_mle(mle); + dlm_put_mle_inuse(mle); wake_waiters: spin_lock(&res->spinlock); @@ -921,12 +998,14 @@ recheck: spin_unlock(&res->spinlock); /* this will cause the master to re-assert across * the whole cluster, freeing up mles */ - ret = dlm_do_master_request(mle, res->owner); - if (ret < 0) { - /* give recovery a chance to run */ - mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret); - msleep(500); - goto recheck; + if (res->owner != dlm->node_num) { + ret = dlm_do_master_request(mle, res->owner); + if (ret < 0) { + /* give recovery a chance to run */ + mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret); + msleep(500); + goto recheck; + } } ret = 0; goto leave; @@ -962,6 +1041,12 @@ recheck: "rechecking now\n", dlm->name, res->lockname.len, res->lockname.name); goto recheck; + } else { + if (!voting_done) { + mlog(0, "map not changed and voting not done " + "for %s:%.*s\n", dlm->name, res->lockname.len, + res->lockname.name); + } } if (m != O2NM_MAX_NODES) { @@ -1129,18 +1214,6 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm, set_bit(node, mle->vote_map); } else { mlog(ML_ERROR, "node down! %d\n", node); - - /* if the node wasn't involved in mastery skip it, - * but clear it out from the maps so that it will - * not affect mastery of this lockres */ - clear_bit(node, mle->response_map); - clear_bit(node, mle->vote_map); - if (!test_bit(node, mle->maybe_map)) - goto next; - - /* if we're already blocked on lock mastery, and the - * dead node wasn't the expected master, or there is - * another node in the maybe_map, keep waiting */ if (blocked) { int lowest = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0); @@ -1148,54 +1221,53 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm, /* act like it was never there */ clear_bit(node, mle->maybe_map); - if (node != lowest) - goto next; - - mlog(ML_ERROR, "expected master %u died while " - "this node was blocked waiting on it!\n", - node); - lowest = find_next_bit(mle->maybe_map, - O2NM_MAX_NODES, - lowest+1); - if (lowest < O2NM_MAX_NODES) { - mlog(0, "still blocked. waiting " - "on %u now\n", lowest); - goto next; + if (node == lowest) { + mlog(0, "expected master %u died" + " while this node was blocked " + "waiting on it!\n", node); + lowest = find_next_bit(mle->maybe_map, + O2NM_MAX_NODES, + lowest+1); + if (lowest < O2NM_MAX_NODES) { + mlog(0, "%s:%.*s:still " + "blocked. waiting on %u " + "now\n", dlm->name, + res->lockname.len, + res->lockname.name, + lowest); + } else { + /* mle is an MLE_BLOCK, but + * there is now nothing left to + * block on. we need to return + * all the way back out and try + * again with an MLE_MASTER. + * dlm_do_local_recovery_cleanup + * has already run, so the mle + * refcount is ok */ + mlog(0, "%s:%.*s: no " + "longer blocking. try to " + "master this here\n", + dlm->name, + res->lockname.len, + res->lockname.name); + mle->type = DLM_MLE_MASTER; + mle->u.res = res; + } } - - /* mle is an MLE_BLOCK, but there is now - * nothing left to block on. we need to return - * all the way back out and try again with - * an MLE_MASTER. dlm_do_local_recovery_cleanup - * has already run, so the mle refcount is ok */ - mlog(0, "no longer blocking. we can " - "try to master this here\n"); - mle->type = DLM_MLE_MASTER; - memset(mle->maybe_map, 0, - sizeof(mle->maybe_map)); - memset(mle->response_map, 0, - sizeof(mle->maybe_map)); - memcpy(mle->vote_map, mle->node_map, - sizeof(mle->node_map)); - mle->u.res = res; - set_bit(dlm->node_num, mle->maybe_map); - - ret = -EAGAIN; - goto next; } - clear_bit(node, mle->maybe_map); - if (node > dlm->node_num) - goto next; - - mlog(0, "dead node in map!\n"); - /* yuck. go back and re-contact all nodes - * in the vote_map, removing this node. */ - memset(mle->response_map, 0, - sizeof(mle->response_map)); + /* now blank out everything, as if we had never + * contacted anyone */ + memset(mle->maybe_map, 0, sizeof(mle->maybe_map)); + memset(mle->response_map, 0, sizeof(mle->response_map)); + /* reset the vote_map to the current node_map */ + memcpy(mle->vote_map, mle->node_map, + sizeof(mle->node_map)); + /* put myself into the maybe map */ + if (mle->type != DLM_MLE_BLOCK) + set_bit(dlm->node_num, mle->maybe_map); } ret = -EAGAIN; -next: node = dlm_bitmap_diff_iter_next(&bdi, &sc); } return ret; @@ -1316,7 +1388,7 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data) struct dlm_master_request *request = (struct dlm_master_request *) msg->buf; struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL; char *name; - unsigned int namelen; + unsigned int namelen, hash; int found, ret; int set_maybe; int dispatch_assert = 0; @@ -1331,6 +1403,7 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data) name = request->name; namelen = request->namelen; + hash = dlm_lockid_hash(name, namelen); if (namelen > DLM_LOCKID_NAME_MAX) { response = DLM_IVBUFLEN; @@ -1339,7 +1412,7 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data) way_up_top: spin_lock(&dlm->spinlock); - res = __dlm_lookup_lockres(dlm, name, namelen); + res = __dlm_lookup_lockres(dlm, name, namelen, hash); if (res) { spin_unlock(&dlm->spinlock); @@ -1465,15 +1538,12 @@ way_up_top: mlog_errno(-ENOMEM); goto send_response; } - spin_lock(&dlm->spinlock); - dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, - name, namelen); - spin_unlock(&dlm->spinlock); goto way_up_top; } // mlog(0, "this is second time thru, already allocated, " // "add the block.\n"); + dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen); set_bit(request->node_idx, mle->maybe_map); list_add(&mle->list, &dlm->master_list); response = DLM_MASTER_RESP_NO; @@ -1556,6 +1626,8 @@ again: dlm_node_iter_init(nodemap, &iter); while ((to = dlm_node_iter_next(&iter)) >= 0) { int r = 0; + struct dlm_master_list_entry *mle = NULL; + mlog(0, "sending assert master to %d (%.*s)\n", to, namelen, lockname); memset(&assert, 0, sizeof(assert)); @@ -1580,7 +1652,15 @@ again: /* ok, something horribly messed. kill thyself. */ mlog(ML_ERROR,"during assert master of %.*s to %u, " "got %d.\n", namelen, lockname, to, r); - dlm_dump_lock_resources(dlm); + spin_lock(&dlm->spinlock); + spin_lock(&dlm->master_lock); + if (dlm_find_mle(dlm, &mle, (char *)lockname, + namelen)) { + dlm_print_one_mle(mle); + __dlm_put_mle(mle); + } + spin_unlock(&dlm->master_lock); + spin_unlock(&dlm->spinlock); BUG(); } else if (r == EAGAIN) { mlog(0, "%.*s: node %u create mles on other " @@ -1612,7 +1692,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf; struct dlm_lock_resource *res = NULL; char *name; - unsigned int namelen; + unsigned int namelen, hash; u32 flags; int master_request = 0; int ret = 0; @@ -1622,6 +1702,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) name = assert->name; namelen = assert->namelen; + hash = dlm_lockid_hash(name, namelen); flags = be32_to_cpu(assert->flags); if (namelen > DLM_LOCKID_NAME_MAX) { @@ -1646,7 +1727,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) if (bit >= O2NM_MAX_NODES) { /* not necessarily an error, though less likely. * could be master just re-asserting. */ - mlog(ML_ERROR, "no bits set in the maybe_map, but %u " + mlog(0, "no bits set in the maybe_map, but %u " "is asserting! (%.*s)\n", assert->node_idx, namelen, name); } else if (bit != assert->node_idx) { @@ -1658,19 +1739,36 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) * number winning the mastery will respond * YES to mastery requests, but this node * had no way of knowing. let it pass. */ - mlog(ML_ERROR, "%u is the lowest node, " + mlog(0, "%u is the lowest node, " "%u is asserting. (%.*s) %u must " "have begun after %u won.\n", bit, assert->node_idx, namelen, name, bit, assert->node_idx); } } + if (mle->type == DLM_MLE_MIGRATION) { + if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) { + mlog(0, "%s:%.*s: got cleanup assert" + " from %u for migration\n", + dlm->name, namelen, name, + assert->node_idx); + } else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) { + mlog(0, "%s:%.*s: got unrelated assert" + " from %u for migration, ignoring\n", + dlm->name, namelen, name, + assert->node_idx); + __dlm_put_mle(mle); + spin_unlock(&dlm->master_lock); + spin_unlock(&dlm->spinlock); + goto done; + } + } } spin_unlock(&dlm->master_lock); /* ok everything checks out with the MLE * now check to see if there is a lockres */ - res = __dlm_lookup_lockres(dlm, name, namelen); + res = __dlm_lookup_lockres(dlm, name, namelen, hash); if (res) { spin_lock(&res->spinlock); if (res->state & DLM_LOCK_RES_RECOVERING) { @@ -1679,7 +1777,8 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) goto kill; } if (!mle) { - if (res->owner != assert->node_idx) { + if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN && + res->owner != assert->node_idx) { mlog(ML_ERROR, "assert_master from " "%u, but current owner is " "%u! (%.*s)\n", @@ -1732,6 +1831,7 @@ ok: if (mle) { int extra_ref = 0; int nn = -1; + int rr, err = 0; spin_lock(&mle->spinlock); if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION) @@ -1751,27 +1851,64 @@ ok: wake_up(&mle->wq); spin_unlock(&mle->spinlock); - if (mle->type == DLM_MLE_MIGRATION && res) { - mlog(0, "finishing off migration of lockres %.*s, " - "from %u to %u\n", - res->lockname.len, res->lockname.name, - dlm->node_num, mle->new_master); + if (res) { spin_lock(&res->spinlock); - res->state &= ~DLM_LOCK_RES_MIGRATING; - dlm_change_lockres_owner(dlm, res, mle->new_master); - BUG_ON(res->state & DLM_LOCK_RES_DIRTY); + if (mle->type == DLM_MLE_MIGRATION) { + mlog(0, "finishing off migration of lockres %.*s, " + "from %u to %u\n", + res->lockname.len, res->lockname.name, + dlm->node_num, mle->new_master); + res->state &= ~DLM_LOCK_RES_MIGRATING; + dlm_change_lockres_owner(dlm, res, mle->new_master); + BUG_ON(res->state & DLM_LOCK_RES_DIRTY); + } else { + dlm_change_lockres_owner(dlm, res, mle->master); + } spin_unlock(&res->spinlock); } - /* master is known, detach if not already detached */ - dlm_mle_detach_hb_events(dlm, mle); - dlm_put_mle(mle); - + + /* master is known, detach if not already detached. + * ensures that only one assert_master call will happen + * on this mle. */ + spin_lock(&dlm->spinlock); + spin_lock(&dlm->master_lock); + + rr = atomic_read(&mle->mle_refs.refcount); + if (mle->inuse > 0) { + if (extra_ref && rr < 3) + err = 1; + else if (!extra_ref && rr < 2) + err = 1; + } else { + if (extra_ref && rr < 2) + err = 1; + else if (!extra_ref && rr < 1) + err = 1; + } + if (err) { + mlog(ML_ERROR, "%s:%.*s: got assert master from %u " + "that will mess up this node, refs=%d, extra=%d, " + "inuse=%d\n", dlm->name, namelen, name, + assert->node_idx, rr, extra_ref, mle->inuse); + dlm_print_one_mle(mle); + } + list_del_init(&mle->list); + __dlm_mle_detach_hb_events(dlm, mle); + __dlm_put_mle(mle); if (extra_ref) { /* the assert master message now balances the extra * ref given by the master / migration request message. * if this is the last put, it will be removed * from the list. */ - dlm_put_mle(mle); + __dlm_put_mle(mle); + } + spin_unlock(&dlm->master_lock); + spin_unlock(&dlm->spinlock); + } else if (res) { + if (res->owner != assert->node_idx) { + mlog(0, "assert_master from %u, but current " + "owner is %u (%.*s), no mle\n", assert->node_idx, + res->owner, namelen, name); } } @@ -1788,12 +1925,12 @@ done: kill: /* kill the caller! */ + mlog(ML_ERROR, "Bad message received from another node. Dumping state " + "and killing the other node now! This node is OK and can continue.\n"); + __dlm_print_one_lock_resource(res); spin_unlock(&res->spinlock); spin_unlock(&dlm->spinlock); dlm_lockres_put(res); - mlog(ML_ERROR, "Bad message received from another node. Dumping state " - "and killing the other node now! This node is OK and can continue.\n"); - dlm_dump_lock_resources(dlm); dlm_put(dlm); return -EINVAL; } @@ -1866,6 +2003,23 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data) } } + /* + * If we're migrating this lock to someone else, we are no + * longer allowed to assert out own mastery. OTOH, we need to + * prevent migration from starting while we're still asserting + * our dominance. The reserved ast delays migration. + */ + spin_lock(&res->spinlock); + if (res->state & DLM_LOCK_RES_MIGRATING) { + mlog(0, "Someone asked us to assert mastery, but we're " + "in the middle of migration. Skipping assert, " + "the new master will handle that.\n"); + spin_unlock(&res->spinlock); + goto put; + } else + __dlm_lockres_reserve_ast(res); + spin_unlock(&res->spinlock); + /* this call now finishes out the nodemap * even if one or more nodes die */ mlog(0, "worker about to master %.*s here, this=%u\n", @@ -1878,6 +2032,10 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data) mlog_errno(ret); } + /* Ok, we've asserted ourselves. Let's let migration start. */ + dlm_lockres_release_ast(dlm, res); + +put: dlm_lockres_put(res); mlog(0, "finished with dlm_assert_master_worker\n"); @@ -1916,6 +2074,7 @@ static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm, BUG(); /* host is down, so answer for that node would be * DLM_LOCK_RES_OWNER_UNKNOWN. continue. */ + ret = 0; } if (master != DLM_LOCK_RES_OWNER_UNKNOWN) { @@ -2117,7 +2276,7 @@ fail: * take both dlm->spinlock and dlm->master_lock */ spin_lock(&dlm->spinlock); spin_lock(&dlm->master_lock); - dlm_get_mle(mle); + dlm_get_mle_inuse(mle); spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); @@ -2134,7 +2293,10 @@ fail: /* migration failed, detach and clean up mle */ dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); - dlm_put_mle(mle); + dlm_put_mle_inuse(mle); + spin_lock(&res->spinlock); + res->state &= ~DLM_LOCK_RES_MIGRATING; + spin_unlock(&res->spinlock); goto leave; } @@ -2164,8 +2326,8 @@ fail: /* avoid hang during shutdown when migrating lockres * to a node which also goes down */ if (dlm_is_node_dead(dlm, target)) { - mlog(0, "%s:%.*s: expected migration target %u " - "is no longer up. restarting.\n", + mlog(0, "%s:%.*s: expected migration " + "target %u is no longer up, restarting\n", dlm->name, res->lockname.len, res->lockname.name, target); ret = -ERESTARTSYS; @@ -2175,7 +2337,10 @@ fail: /* migration failed, detach and clean up mle */ dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); - dlm_put_mle(mle); + dlm_put_mle_inuse(mle); + spin_lock(&res->spinlock); + res->state &= ~DLM_LOCK_RES_MIGRATING; + spin_unlock(&res->spinlock); goto leave; } /* TODO: if node died: stop, clean up, return error */ @@ -2191,7 +2356,7 @@ fail: /* master is known, detach if not already detached */ dlm_mle_detach_hb_events(dlm, mle); - dlm_put_mle(mle); + dlm_put_mle_inuse(mle); ret = 0; dlm_lockres_calc_usage(dlm, res); @@ -2462,7 +2627,7 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data) struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf; struct dlm_master_list_entry *mle = NULL, *oldmle = NULL; const char *name; - unsigned int namelen; + unsigned int namelen, hash; int ret = 0; if (!dlm_grab(dlm)) @@ -2470,6 +2635,7 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data) name = migrate->name; namelen = migrate->namelen; + hash = dlm_lockid_hash(name, namelen); /* preallocate.. if this fails, abort */ mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache, @@ -2482,7 +2648,7 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data) /* check for pre-existing lock */ spin_lock(&dlm->spinlock); - res = __dlm_lookup_lockres(dlm, name, namelen); + res = __dlm_lookup_lockres(dlm, name, namelen, hash); spin_lock(&dlm->master_lock); if (res) { @@ -2580,6 +2746,7 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm, /* remove it from the list so that only one * mle will be found */ list_del_init(&tmp->list); + __dlm_mle_detach_hb_events(dlm, mle); } spin_unlock(&tmp->spinlock); } @@ -2601,6 +2768,7 @@ void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node) struct list_head *iter, *iter2; struct dlm_master_list_entry *mle; struct dlm_lock_resource *res; + unsigned int hash; mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node); top: @@ -2673,19 +2841,21 @@ top: /* remove from the list early. NOTE: unlinking * list_head while in list_for_each_safe */ + __dlm_mle_detach_hb_events(dlm, mle); spin_lock(&mle->spinlock); list_del_init(&mle->list); atomic_set(&mle->woken, 1); spin_unlock(&mle->spinlock); wake_up(&mle->wq); - mlog(0, "node %u died during migration from " - "%u to %u!\n", dead_node, + mlog(0, "%s: node %u died during migration from " + "%u to %u!\n", dlm->name, dead_node, mle->master, mle->new_master); /* if there is a lockres associated with this * mle, find it and set its owner to UNKNOWN */ + hash = dlm_lockid_hash(mle->u.name.name, mle->u.name.len); res = __dlm_lookup_lockres(dlm, mle->u.name.name, - mle->u.name.len); + mle->u.name.len, hash); if (res) { /* unfortunately if we hit this rare case, our * lock ordering is messed. we need to drop