static int dlm_recovery_thread(void *data);
void dlm_complete_recovery_thread(struct dlm_ctxt *dlm);
int dlm_launch_recovery_thread(struct dlm_ctxt *dlm);
-static void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
+void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
static int dlm_do_recovery(struct dlm_ctxt *dlm);
static int dlm_pick_recovery_master(struct dlm_ctxt *dlm);
u8 send_to,
struct dlm_lock_resource *res,
int total_locks);
-static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
- struct dlm_lock_resource *res,
- u8 *real_master);
static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
struct dlm_migratable_lockres *mres);
-static int dlm_do_master_requery(struct dlm_ctxt *dlm,
- struct dlm_lock_resource *res,
- u8 nodenum, u8 *real_master);
static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm);
static int dlm_send_all_done_msg(struct dlm_ctxt *dlm,
u8 dead_node, u8 send_to);
return c;
}
+static inline void dlm_set_reco_dead_node(struct dlm_ctxt *dlm,
+ u8 dead_node)
+{
+ assert_spin_locked(&dlm->spinlock);
+ if (dlm->reco.dead_node != dead_node)
+ mlog(0, "%s: changing dead_node from %u to %u\n",
+ dlm->name, dlm->reco.dead_node, dead_node);
+ dlm->reco.dead_node = dead_node;
+}
+
+static inline void dlm_set_reco_master(struct dlm_ctxt *dlm,
+ u8 master)
+{
+ assert_spin_locked(&dlm->spinlock);
+ mlog(0, "%s: changing new_master from %u to %u\n",
+ dlm->name, dlm->reco.new_master, master);
+ dlm->reco.new_master = master;
+}
+
static inline void dlm_reset_recovery(struct dlm_ctxt *dlm)
{
spin_lock(&dlm->spinlock);
clear_bit(dlm->reco.dead_node, dlm->recovery_map);
- dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
- dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
+ dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
+ dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
spin_unlock(&dlm->spinlock);
}
* RECOVERY THREAD
*/
-static void dlm_kick_recovery_thread(struct dlm_ctxt *dlm)
+void dlm_kick_recovery_thread(struct dlm_ctxt *dlm)
{
/* wake the recovery thread
* this will wake the reco thread in one of three places
{
int dead;
spin_lock(&dlm->spinlock);
- dead = test_bit(node, dlm->domain_map);
+ dead = !test_bit(node, dlm->domain_map);
spin_unlock(&dlm->spinlock);
return dead;
}
mlog(0, "new master %u died while recovering %u!\n",
dlm->reco.new_master, dlm->reco.dead_node);
/* unset the new_master, leave dead_node */
- dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
+ dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
}
/* select a target to recover */
bit = find_next_bit (dlm->recovery_map, O2NM_MAX_NODES+1, 0);
if (bit >= O2NM_MAX_NODES || bit < 0)
- dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
+ dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
else
- dlm->reco.dead_node = bit;
+ dlm_set_reco_dead_node(dlm, bit);
} else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) {
/* BUG? */
mlog(ML_ERROR, "dead_node %u no longer in recovery map!\n",
dlm->reco.dead_node);
- dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
+ dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
}
if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
if (!dlm_grab(dlm))
return -EINVAL;
+ if (lr->dead_node != dlm->reco.dead_node) {
+ mlog(ML_ERROR, "%s: node %u sent dead_node=%u, but local "
+ "dead_node is %u\n", dlm->name, lr->node_idx,
+ lr->dead_node, dlm->reco.dead_node);
+ /* this is a hack */
+ dlm_put(dlm);
+ return -ENOMEM;
+ }
BUG_ON(lr->dead_node != dlm->reco.dead_node);
item = kcalloc(1, sizeof(*item), GFP_KERNEL);
dlm->name, dlm->reco.dead_node, dlm->reco.new_master,
dead_node, reco_master);
mlog(ML_ERROR, "%s: name=%.*s master=%u locks=%u/%u flags=%u "
- "entry[0]={c=%"MLFu64",l=%u,f=%u,t=%d,ct=%d,hb=%d,n=%u}\n",
+ "entry[0]={c=%u:%llu,l=%u,f=%u,t=%d,ct=%d,hb=%d,n=%u}\n",
dlm->name, mres->lockname_len, mres->lockname, mres->master,
mres->num_locks, mres->total_locks, mres->flags,
- mres->ml[0].cookie, mres->ml[0].list, mres->ml[0].flags,
+ dlm_get_lock_cookie_node(mres->ml[0].cookie),
+ dlm_get_lock_cookie_seq(mres->ml[0].cookie),
+ mres->ml[0].list, mres->ml[0].flags,
mres->ml[0].type, mres->ml[0].convert_type,
mres->ml[0].highest_blocked, mres->ml[0].node);
BUG();
mlog(0, "found lockres owned by dead node while "
"doing recovery for node %u. sending it.\n",
dead_node);
- list_del_init(&res->recovering);
- list_add_tail(&res->recovering, list);
+ list_move_tail(&res->recovering, list);
} else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
mlog(0, "found UNKNOWN owner while doing recovery "
"for node %u. sending it.\n", dead_node);
- list_del_init(&res->recovering);
- list_add_tail(&res->recovering, list);
+ list_move_tail(&res->recovering, list);
}
}
spin_unlock(&dlm->spinlock);
ml->type == LKM_PRMODE) {
/* if it is already set, this had better be a PR
* and it has to match */
- if (mres->lvb[0] && (ml->type == LKM_EXMODE ||
- memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) {
+ if (!dlm_lvb_is_empty(mres->lvb) &&
+ (ml->type == LKM_EXMODE ||
+ memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) {
mlog(ML_ERROR, "mismatched lvbs!\n");
__dlm_print_one_lock_resource(lock->lockres);
BUG();
-static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
- struct dlm_lock_resource *res,
- u8 *real_master)
+int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res, u8 *real_master)
{
struct dlm_node_iter iter;
int nodenum;
ret = dlm_do_master_requery(dlm, res, nodenum, real_master);
if (ret < 0) {
mlog_errno(ret);
- BUG();
- /* TODO: need to figure a way to restart this */
+ if (!dlm_is_host_down(ret))
+ BUG();
+ /* host is down, so answer for that node would be
+ * DLM_LOCK_RES_OWNER_UNKNOWN. continue. */
}
if (*real_master != DLM_LOCK_RES_OWNER_UNKNOWN) {
mlog(0, "lock master is %u\n", *real_master);
}
-static int dlm_do_master_requery(struct dlm_ctxt *dlm,
- struct dlm_lock_resource *res,
- u8 nodenum, u8 *real_master)
+int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
+ u8 nodenum, u8 *real_master)
{
int ret = -EINVAL;
struct dlm_master_requery req;
struct dlm_ctxt *dlm = data;
struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf;
struct dlm_lock_resource *res = NULL;
+ unsigned int hash;
int master = DLM_LOCK_RES_OWNER_UNKNOWN;
u32 flags = DLM_ASSERT_MASTER_REQUERY;
return master;
}
+ hash = dlm_lockid_hash(req->name, req->namelen);
+
spin_lock(&dlm->spinlock);
- res = __dlm_lookup_lockres(dlm, req->name, req->namelen);
+ res = __dlm_lookup_lockres(dlm, req->name, req->namelen, hash);
if (res) {
spin_lock(&res->spinlock);
master = res->owner;
struct dlm_lock *newlock = NULL;
struct dlm_lockstatus *lksb = NULL;
int ret = 0;
- int i;
+ int i, bad;
struct list_head *iter;
struct dlm_lock *lock = NULL;
/* lock is always created locally first, and
* destroyed locally last. it must be on the list */
if (!lock) {
+ u64 c = ml->cookie;
mlog(ML_ERROR, "could not find local lock "
- "with cookie %"MLFu64"!\n",
- ml->cookie);
+ "with cookie %u:%llu!\n",
+ dlm_get_lock_cookie_node(c),
+ dlm_get_lock_cookie_seq(c));
BUG();
}
BUG_ON(lock->ml.node != ml->node);
/* move the lock to its proper place */
/* do not alter lock refcount. switching lists. */
- list_del_init(&lock->list);
- list_add_tail(&lock->list, queue);
+ list_move_tail(&lock->list, queue);
spin_unlock(&res->spinlock);
mlog(0, "just reordered a local lock!\n");
lksb->flags |= (ml->flags &
(DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
- if (mres->lvb[0]) {
+ if (!dlm_lvb_is_empty(mres->lvb)) {
if (lksb->flags & DLM_LKSB_PUT_LVB) {
/* other node was trying to update
* lvb when node died. recreate the
* most recent valid lvb info */
BUG_ON(ml->type != LKM_EXMODE &&
ml->type != LKM_PRMODE);
- if (res->lvb[0] && (ml->type == LKM_EXMODE ||
- memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {
+ if (!dlm_lvb_is_empty(res->lvb) &&
+ (ml->type == LKM_EXMODE ||
+ memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {
mlog(ML_ERROR, "received bad lvb!\n");
__dlm_print_one_lock_resource(res);
BUG();
* relative to each other, but clearly *not*
* preserved relative to locks from other nodes.
*/
+ bad = 0;
spin_lock(&res->spinlock);
- dlm_lock_get(newlock);
- list_add_tail(&newlock->list, queue);
+ list_for_each_entry(lock, queue, list) {
+ if (lock->ml.cookie == ml->cookie) {
+ u64 c = lock->ml.cookie;
+ mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already "
+ "exists on this lockres!\n", dlm->name,
+ res->lockname.len, res->lockname.name,
+ dlm_get_lock_cookie_node(c),
+ dlm_get_lock_cookie_seq(c));
+
+ mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, "
+ "node=%u, cookie=%u:%llu, queue=%d\n",
+ ml->type, ml->convert_type, ml->node,
+ dlm_get_lock_cookie_node(ml->cookie),
+ dlm_get_lock_cookie_seq(ml->cookie),
+ ml->list);
+
+ __dlm_print_one_lock_resource(res);
+ bad = 1;
+ break;
+ }
+ }
+ if (!bad) {
+ dlm_lock_get(newlock);
+ list_add_tail(&newlock->list, queue);
+ }
spin_unlock(&res->spinlock);
}
mlog(0, "done running all the locks\n");
u8 dead_node, u8 new_master)
{
int i;
- struct list_head *iter, *iter2, *bucket;
+ struct list_head *iter, *iter2;
+ struct hlist_node *hash_iter;
+ struct hlist_head *bucket;
+
struct dlm_lock_resource *res;
mlog_entry_void();
* for now we need to run the whole hash, clear
* the RECOVERING state and set the owner
* if necessary */
- for (i=0; i<DLM_HASH_SIZE; i++) {
- bucket = &(dlm->resources[i]);
- list_for_each(iter, bucket) {
- res = list_entry (iter, struct dlm_lock_resource, list);
+ for (i = 0; i < DLM_HASH_BUCKETS; i++) {
+ bucket = dlm_lockres_hash(dlm, i);
+ hlist_for_each_entry(res, hash_iter, bucket, hash_node) {
if (res->state & DLM_LOCK_RES_RECOVERING) {
if (res->owner == dead_node) {
mlog(0, "(this=%u) res %.*s owner=%u "
} else
continue;
+ if (!list_empty(&res->recovering)) {
+ mlog(0, "%s:%.*s: lockres was "
+ "marked RECOVERING, owner=%u\n",
+ dlm->name, res->lockname.len,
+ res->lockname.name, res->owner);
+ list_del_init(&res->recovering);
+ }
spin_lock(&res->spinlock);
dlm_change_lockres_owner(dlm, res, new_master);
res->state &= ~DLM_LOCK_RES_RECOVERING;
static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
{
- struct list_head *iter;
+ struct hlist_node *iter;
struct dlm_lock_resource *res;
int i;
- struct list_head *bucket;
+ struct hlist_head *bucket;
struct dlm_lock *lock;
* can be kicked again to see if any ASTs or BASTs
* need to be fired as a result.
*/
- for (i=0; i<DLM_HASH_SIZE; i++) {
- bucket = &(dlm->resources[i]);
- list_for_each(iter, bucket) {
- res = list_entry (iter, struct dlm_lock_resource, list);
+ for (i = 0; i < DLM_HASH_BUCKETS; i++) {
+ bucket = dlm_lockres_hash(dlm, i);
+ hlist_for_each_entry(res, iter, bucket, hash_node) {
/* always prune any $RECOVERY entries for dead nodes,
* otherwise hangs can occur during later recovery */
if (dlm_is_recovery_lock(res->lockname.name,
/* set the new_master to this node */
spin_lock(&dlm->spinlock);
- dlm->reco.new_master = dlm->node_num;
+ dlm_set_reco_master(dlm, dlm->node_num);
spin_unlock(&dlm->spinlock);
}
"node %u changing it to %u\n", dlm->name,
dlm->reco.dead_node, br->node_idx, br->dead_node);
}
- dlm->reco.new_master = br->node_idx;
- dlm->reco.dead_node = br->dead_node;
+ dlm_set_reco_master(dlm, br->node_idx);
+ dlm_set_reco_dead_node(dlm, br->dead_node);
if (!test_bit(br->dead_node, dlm->recovery_map)) {
mlog(0, "recovery master %u sees %u as dead, but this "
"node has not yet. marking %u as dead\n",
mlog(0, "%u not in domain/live_nodes map "
"so setting it in reco map manually\n",
br->dead_node);
- set_bit(br->dead_node, dlm->recovery_map);
+ /* force the recovery cleanup in __dlm_hb_node_down
+ * both of these will be cleared in a moment */
+ set_bit(br->dead_node, dlm->domain_map);
+ set_bit(br->dead_node, dlm->live_nodes_map);
__dlm_hb_node_down(dlm, br->dead_node);
}
spin_unlock(&dlm->spinlock);