]> err.no Git - linux-2.6/blobdiff - fs/ocfs2/dlm/dlmlock.c
ocfs2: Increment the reference count of an already-active stack.
[linux-2.6] / fs / ocfs2 / dlm / dlmlock.c
index 0ff934874942a423e19478331423408017db0dc3..83a9f2972ac8189ca581ac9237ecf94a590b6f0d 100644 (file)
@@ -53,7 +53,9 @@
 #define MLOG_MASK_PREFIX ML_DLM
 #include "cluster/masklog.h"
 
-static spinlock_t dlm_cookie_lock = SPIN_LOCK_UNLOCKED;
+static struct kmem_cache *dlm_lock_cache = NULL;
+
+static DEFINE_SPINLOCK(dlm_cookie_lock);
 static u64 dlm_next_cookie = 1;
 
 static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
@@ -64,6 +66,22 @@ static void dlm_init_lock(struct dlm_lock *newlock, int type,
 static void dlm_lock_release(struct kref *kref);
 static void dlm_lock_detach_lockres(struct dlm_lock *lock);
 
+int dlm_init_lock_cache(void)
+{
+       dlm_lock_cache = kmem_cache_create("o2dlm_lock",
+                                          sizeof(struct dlm_lock),
+                                          0, SLAB_HWCACHE_ALIGN, NULL);
+       if (dlm_lock_cache == NULL)
+               return -ENOMEM;
+       return 0;
+}
+
+void dlm_destroy_lock_cache(void)
+{
+       if (dlm_lock_cache)
+               kmem_cache_destroy(dlm_lock_cache);
+}
+
 /* Tell us whether we can grant a new lock request.
  * locking:
  *   caller needs:  res->spinlock
@@ -163,6 +181,10 @@ static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm,
                        kick_thread = 1;
                }
        }
+       /* reduce the inflight count, this may result in the lockres
+        * being purged below during calc_usage */
+       if (lock->ml.node == dlm->node_num)
+               dlm_lockres_drop_inflight_ref(dlm, res);
 
        spin_unlock(&res->spinlock);
        wake_up(&res->wq);
@@ -227,7 +249,16 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
        res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
        lock->lock_pending = 0;
        if (status != DLM_NORMAL) {
-               if (status != DLM_NOTQUEUED) {
+               if (status == DLM_RECOVERING &&
+                   dlm_is_recovery_lock(res->lockname.name,
+                                        res->lockname.len)) {
+                       /* recovery lock was mastered by dead node.
+                        * we need to have calc_usage shoot down this
+                        * lockres and completely remaster it. */
+                       mlog(0, "%s: recovery lock was owned by "
+                            "dead node %u, remaster it now.\n",
+                            dlm->name, res->owner);
+               } else if (status != DLM_NOTQUEUED) {
                        /*
                         * DO NOT call calc_usage, as this would unhash
                         * the remote lockres before we ever get to use
@@ -290,7 +321,7 @@ static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
        if (tmpret >= 0) {
                // successfully sent and received
                ret = status;  // this is already a dlm_status
-               if (ret == DLM_RECOVERING) {
+               if (ret == DLM_REJECTED) {
                        mlog(ML_ERROR, "%s:%.*s: BUG.  this is a stale lockres "
                             "no longer owned by %u.  that node is coming back "
                             "up currently.\n", dlm->name, create.namelen,
@@ -340,7 +371,7 @@ static void dlm_lock_release(struct kref *kref)
                mlog(0, "freeing kernel-allocated lksb\n");
                kfree(lock->lksb);
        }
-       kfree(lock);
+       kmem_cache_free(dlm_lock_cache, lock);
 }
 
 /* associate a lock with it's lockres, getting a ref on the lockres */
@@ -399,13 +430,13 @@ struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie,
        struct dlm_lock *lock;
        int kernel_allocated = 0;
 
-       lock = kcalloc(1, sizeof(*lock), GFP_KERNEL);
+       lock = (struct dlm_lock *) kmem_cache_zalloc(dlm_lock_cache, GFP_NOFS);
        if (!lock)
                return NULL;
 
        if (!lksb) {
                /* zero memory only if kernel-allocated */
-               lksb = kcalloc(1, sizeof(*lksb), GFP_KERNEL);
+               lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
                if (!lksb) {
                        kfree(lock);
                        return NULL;
@@ -428,7 +459,8 @@ struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie,
  *   held on exit:  none
  * returns: DLM_NORMAL, DLM_SYSERR, DLM_IVLOCKID, DLM_NOTQUEUED
  */
-int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data,
+                           void **ret_data)
 {
        struct dlm_ctxt *dlm = data;
        struct dlm_create_lock *create = (struct dlm_create_lock *)msg->buf;
@@ -448,7 +480,7 @@ int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data)
 
        name = create->name;
        namelen = create->namelen;
-       status = DLM_RECOVERING;
+       status = DLM_REJECTED;
        if (!dlm_domain_fully_joined(dlm)) {
                mlog(ML_ERROR, "Domain %s not fully joined, but node %u is "
                     "sending a create_lock message for lock %.*s!\n",
@@ -531,8 +563,8 @@ static inline void dlm_get_next_cookie(u8 node_num, u64 *cookie)
 
 enum dlm_status dlmlock(struct dlm_ctxt *dlm, int mode,
                        struct dlm_lockstatus *lksb, int flags,
-                       const char *name, dlm_astlockfunc_t *ast, void *data,
-                       dlm_bastlockfunc_t *bast)
+                       const char *name, int namelen, dlm_astlockfunc_t *ast,
+                       void *data, dlm_bastlockfunc_t *bast)
 {
        enum dlm_status status;
        struct dlm_lock_resource *res = NULL;
@@ -562,7 +594,7 @@ enum dlm_status dlmlock(struct dlm_ctxt *dlm, int mode,
        recovery = (flags & LKM_RECOVERY);
 
        if (recovery &&
-           (!dlm_is_recovery_lock(name, strlen(name)) || convert) ) {
+           (!dlm_is_recovery_lock(name, namelen) || convert) ) {
                dlm_error(status);
                goto error;
        }
@@ -634,7 +666,7 @@ retry_convert:
                }
 
                status = DLM_IVBUFLEN;
-               if (strlen(name) > DLM_LOCKID_NAME_MAX || strlen(name) < 1) {
+               if (namelen > DLM_LOCKID_NAME_MAX || namelen < 1) {
                        dlm_error(status);
                        goto error;
                }
@@ -650,7 +682,7 @@ retry_convert:
                        dlm_wait_for_recovery(dlm);
 
                /* find or create the lock resource */
-               res = dlm_get_lock_resource(dlm, name, flags);
+               res = dlm_get_lock_resource(dlm, name, namelen, flags);
                if (!res) {
                        status = DLM_IVLOCKID;
                        dlm_error(status);
@@ -691,18 +723,22 @@ retry_lock:
                        msleep(100);
                        /* no waiting for dlm_reco_thread */
                        if (recovery) {
-                               if (status == DLM_RECOVERING) {
-                                       mlog(0, "%s: got RECOVERING "
-                                            "for $REOCVERY lock, master "
-                                            "was %u\n", dlm->name, 
-                                            res->owner);
-                                       dlm_wait_for_node_death(dlm, res->owner, 
-                                                       DLM_NODE_DEATH_WAIT_MAX);
-                               }
+                               if (status != DLM_RECOVERING)
+                                       goto retry_lock;
+
+                               mlog(0, "%s: got RECOVERING "
+                                    "for $RECOVERY lock, master "
+                                    "was %u\n", dlm->name,
+                                    res->owner);
+                               /* wait to see the node go down, then
+                                * drop down and allow the lockres to
+                                * get cleaned up.  need to remaster. */
+                               dlm_wait_for_node_death(dlm, res->owner,
+                                               DLM_NODE_DEATH_WAIT_MAX);
                        } else {
                                dlm_wait_for_recovery(dlm);
+                               goto retry_lock;
                        }
-                       goto retry_lock;
                }
 
                if (status != DLM_NORMAL) {