]> err.no Git - linux-2.6/blobdiff - fs/ocfs2/dlm/dlmlock.c
ocfs2: special case recovery lock in dlmlock_remote()
[linux-2.6] / fs / ocfs2 / dlm / dlmlock.c
index 55cda25ae11b13563d8106e53d7536f98793a808..20b38dc18736f49a46cf7e511c5d6e9a9dacd0d0 100644 (file)
@@ -201,6 +201,7 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
                                      struct dlm_lock *lock, int flags)
 {
        enum dlm_status status = DLM_DENIED;
+       int lockres_changed = 1;
 
        mlog_entry("type=%d\n", lock->ml.type);
        mlog(0, "lockres %.*s, flags = 0x%x\n", res->lockname.len,
@@ -226,8 +227,25 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
        res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
        lock->lock_pending = 0;
        if (status != DLM_NORMAL) {
-               if (status != DLM_NOTQUEUED)
+               if (status == DLM_RECOVERING &&
+                   dlm_is_recovery_lock(res->lockname.name,
+                                        res->lockname.len)) {
+                       /* recovery lock was mastered by dead node.
+                        * we need to have calc_usage shoot down this
+                        * lockres and completely remaster it. */
+                       mlog(0, "%s: recovery lock was owned by "
+                            "dead node %u, remaster it now.\n",
+                            dlm->name, res->owner);
+               } else if (status != DLM_NOTQUEUED) {
+                       /*
+                        * DO NOT call calc_usage, as this would unhash
+                        * the remote lockres before we ever get to use
+                        * it.  treat as if we never made any change to
+                        * the lockres.
+                        */
+                       lockres_changed = 0;
                        dlm_error(status);
+               }
                dlm_revert_pending_lock(res, lock);
                dlm_lock_put(lock);
        } else if (dlm_is_recovery_lock(res->lockname.name, 
@@ -243,7 +261,8 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
        }
        spin_unlock(&res->spinlock);
 
-       dlm_lockres_calc_usage(dlm, res);
+       if (lockres_changed)
+               dlm_lockres_calc_usage(dlm, res);
 
        wake_up(&res->wq);
        return status;
@@ -280,6 +299,14 @@ static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
        if (tmpret >= 0) {
                // successfully sent and received
                ret = status;  // this is already a dlm_status
+               if (ret == DLM_RECOVERING) {
+                       mlog(ML_ERROR, "%s:%.*s: BUG.  this is a stale lockres "
+                            "no longer owned by %u.  that node is coming back "
+                            "up currently.\n", dlm->name, create.namelen,
+                            create.name, res->owner);
+                       dlm_print_one_lock_resource(res);
+                       BUG();
+               }
        } else {
                mlog_errno(tmpret);
                if (dlm_is_host_down(tmpret)) {
@@ -428,11 +455,16 @@ int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data)
        if (!dlm_grab(dlm))
                return DLM_REJECTED;
 
-       mlog_bug_on_msg(!dlm_domain_fully_joined(dlm),
-                       "Domain %s not fully joined!\n", dlm->name);
-
        name = create->name;
        namelen = create->namelen;
+       status = DLM_RECOVERING;
+       if (!dlm_domain_fully_joined(dlm)) {
+               mlog(ML_ERROR, "Domain %s not fully joined, but node %u is "
+                    "sending a create_lock message for lock %.*s!\n",
+                    dlm->name, create->node_idx, namelen, name);
+               dlm_error(status);
+               goto leave;
+       }
 
        status = DLM_IVBUFLEN;
        if (namelen > DLM_LOCKID_NAME_MAX) {
@@ -668,18 +700,22 @@ retry_lock:
                        msleep(100);
                        /* no waiting for dlm_reco_thread */
                        if (recovery) {
-                               if (status == DLM_RECOVERING) {
-                                       mlog(0, "%s: got RECOVERING "
-                                            "for $REOCVERY lock, master "
-                                            "was %u\n", dlm->name, 
-                                            res->owner);
-                                       dlm_wait_for_node_death(dlm, res->owner, 
-                                                       DLM_NODE_DEATH_WAIT_MAX);
-                               }
+                               if (status != DLM_RECOVERING)
+                                       goto retry_lock;
+
+                               mlog(0, "%s: got RECOVERING "
+                                    "for $RECOVERY lock, master "
+                                    "was %u\n", dlm->name,
+                                    res->owner);
+                               /* wait to see the node go down, then
+                                * drop down and allow the lockres to
+                                * get cleaned up.  need to remaster. */
+                               dlm_wait_for_node_death(dlm, res->owner,
+                                               DLM_NODE_DEATH_WAIT_MAX);
                        } else {
                                dlm_wait_for_recovery(dlm);
+                               goto retry_lock;
                        }
-                       goto retry_lock;
                }
 
                if (status != DLM_NORMAL) {