#include <linux/slab.h>
#include <linux/highmem.h>
#include <linux/mm.h>
-#include <linux/crc32.h>
#include <linux/kthread.h>
#include <linux/pagemap.h>
#include <linux/debugfs.h>
#include <linux/seq_file.h>
-#include <cluster/heartbeat.h>
-#include <cluster/nodemanager.h>
-#include <cluster/tcp.h>
-
-#include <dlm/dlmapi.h>
-
#define MLOG_MASK_PREFIX ML_DLM_GLUE
#include <cluster/masklog.h>
#include "ocfs2.h"
+#include "ocfs2_lockingver.h"
#include "alloc.h"
#include "dcache.h"
#include "heartbeat.h"
#include "inode.h"
#include "journal.h"
+#include "stackglue.h"
#include "slot_map.h"
#include "super.h"
#include "uptodate.h"
-#include "vote.h"
#include "buffer_head_io.h"
static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
+static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres);
/*
* Return value from ->downconvert_worker functions.
unsigned int line,
struct ocfs2_lock_res *lockres)
{
- struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
+ struct ocfs2_meta_lvb *lvb =
+ (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);
mlog(level, "LVB information for %s (called from %s:%u):\n",
lockres->l_name, function, line);
struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
/*
- * Optionally called in the downconvert (or "vote") thread
- * after a successful downconvert. The lockres will not be
- * referenced after this callback is called, so it is safe to
- * free memory, etc.
+ * Optionally called in the downconvert thread after a
+ * successful downconvert. The lockres will not be referenced
+ * after this callback is called, so it is safe to free
+ * memory, etc.
*
* The exact semantics of when this is called are controlled
* by ->downconvert_worker()
.flags = 0,
};
-static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = {
+static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = {
.get_osb = ocfs2_get_inode_osb,
.check_downconvert = ocfs2_check_meta_downconvert,
.set_lvb = ocfs2_set_meta_lvb,
- .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
-};
-
-static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = {
- .get_osb = ocfs2_get_inode_osb,
.downconvert_worker = ocfs2_data_convert_worker,
- .flags = 0,
+ .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
};
static struct ocfs2_lock_res_ops ocfs2_super_lops = {
.flags = 0,
};
+static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
+ .get_osb = ocfs2_get_file_osb,
+ .flags = 0,
+};
+
static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
{
return lockres->l_type == OCFS2_LOCK_TYPE_META ||
- lockres->l_type == OCFS2_LOCK_TYPE_DATA ||
lockres->l_type == OCFS2_LOCK_TYPE_RW ||
lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
}
static int ocfs2_lock_create(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres,
int level,
- int dlm_flags);
+ u32 dlm_flags);
static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
int wanted);
static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres);
static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
int convert);
-#define ocfs2_log_dlm_error(_func, _stat, _lockres) do { \
- mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on " \
- "resource %s: %s\n", dlm_errname(_stat), _func, \
- _lockres->l_name, dlm_errmsg(_stat)); \
+#define ocfs2_log_dlm_error(_func, _err, _lockres) do { \
+ mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \
+ _err, _func, _lockres->l_name); \
} while (0)
-static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
- struct ocfs2_lock_res *lockres);
-static int ocfs2_meta_lock_update(struct inode *inode,
+static int ocfs2_downconvert_thread(void *arg);
+static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
+ struct ocfs2_lock_res *lockres);
+static int ocfs2_inode_lock_update(struct inode *inode,
struct buffer_head **bh);
static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
static inline int ocfs2_highest_compat_lock_level(int level);
+static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
+ int new_level);
+static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
+ struct ocfs2_lock_res *lockres,
+ int new_level,
+ int lvb,
+ unsigned int generation);
+static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
+ struct ocfs2_lock_res *lockres);
+static int ocfs2_cancel_convert(struct ocfs2_super *osb,
+ struct ocfs2_lock_res *lockres);
+
static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
u64 blkno,
res->l_ops = ops;
res->l_priv = priv;
- res->l_level = LKM_IVMODE;
- res->l_requested = LKM_IVMODE;
- res->l_blocking = LKM_IVMODE;
+ res->l_level = DLM_LOCK_IV;
+ res->l_requested = DLM_LOCK_IV;
+ res->l_blocking = DLM_LOCK_IV;
res->l_action = OCFS2_AST_INVALID;
res->l_unlock_action = OCFS2_UNLOCK_INVALID;
ops = &ocfs2_inode_rw_lops;
break;
case OCFS2_LOCK_TYPE_META:
- ops = &ocfs2_inode_meta_lops;
- break;
- case OCFS2_LOCK_TYPE_DATA:
- ops = &ocfs2_inode_data_lops;
+ ops = &ocfs2_inode_inode_lops;
break;
case OCFS2_LOCK_TYPE_OPEN:
ops = &ocfs2_inode_open_lops;
return OCFS2_SB(inode->i_sb);
}
+static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres)
+{
+ struct ocfs2_file_private *fp = lockres->l_priv;
+
+ return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb);
+}
+
static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
{
__be64 inode_blkno_be;
&ocfs2_rename_lops, osb);
}
+void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
+ struct ocfs2_file_private *fp)
+{
+ struct inode *inode = fp->fp_file->f_mapping->host;
+ struct ocfs2_inode_info *oi = OCFS2_I(inode);
+
+ ocfs2_lock_res_init_once(lockres);
+ ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno,
+ inode->i_generation, lockres->l_name);
+ ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
+ OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops,
+ fp);
+ lockres->l_flags |= OCFS2_LOCK_NOCACHE;
+}
+
void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
{
mlog_entry_void();
BUG_ON(!lockres);
switch(level) {
- case LKM_EXMODE:
+ case DLM_LOCK_EX:
lockres->l_ex_holders++;
break;
- case LKM_PRMODE:
+ case DLM_LOCK_PR:
lockres->l_ro_holders++;
break;
default:
BUG_ON(!lockres);
switch(level) {
- case LKM_EXMODE:
+ case DLM_LOCK_EX:
BUG_ON(!lockres->l_ex_holders);
lockres->l_ex_holders--;
break;
- case LKM_PRMODE:
+ case DLM_LOCK_PR:
BUG_ON(!lockres->l_ro_holders);
lockres->l_ro_holders--;
break;
* lock types are added. */
static inline int ocfs2_highest_compat_lock_level(int level)
{
- int new_level = LKM_EXMODE;
+ int new_level = DLM_LOCK_EX;
- if (level == LKM_EXMODE)
- new_level = LKM_NLMODE;
- else if (level == LKM_PRMODE)
- new_level = LKM_PRMODE;
+ if (level == DLM_LOCK_EX)
+ new_level = DLM_LOCK_NL;
+ else if (level == DLM_LOCK_PR)
+ new_level = DLM_LOCK_PR;
return new_level;
}
BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
- BUG_ON(lockres->l_blocking <= LKM_NLMODE);
+ BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
lockres->l_level = lockres->l_requested;
if (lockres->l_level <=
ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
- lockres->l_blocking = LKM_NLMODE;
+ lockres->l_blocking = DLM_LOCK_NL;
lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
}
lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
* information is already up to data. Convert from NL to
* *anything* however should mark ourselves as needing an
* update */
- if (lockres->l_level == LKM_NLMODE &&
+ if (lockres->l_level == DLM_LOCK_NL &&
lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY)));
BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
- if (lockres->l_requested > LKM_NLMODE &&
+ if (lockres->l_requested > DLM_LOCK_NL &&
!(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
return needs_downconvert;
}
+/*
+ * OCFS2_LOCK_PENDING and l_pending_gen.
+ *
+ * Why does OCFS2_LOCK_PENDING exist? To close a race between setting
+ * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock(). See ocfs2_unblock_lock()
+ * for more details on the race.
+ *
+ * OCFS2_LOCK_PENDING closes the race quite nicely. However, it introduces
+ * a race on itself. In o2dlm, we can get the ast before ocfs2_dlm_lock()
+ * returns. The ast clears OCFS2_LOCK_BUSY, and must therefore clear
+ * OCFS2_LOCK_PENDING at the same time. When ocfs2_dlm_lock() returns,
+ * the caller is going to try to clear PENDING again. If nothing else is
+ * happening, __lockres_clear_pending() sees PENDING is unset and does
+ * nothing.
+ *
+ * But what if another path (eg downconvert thread) has just started a
+ * new locking action? The other path has re-set PENDING. Our path
+ * cannot clear PENDING, because that will re-open the original race
+ * window.
+ *
+ * [Example]
+ *
+ * ocfs2_meta_lock()
+ * ocfs2_cluster_lock()
+ * set BUSY
+ * set PENDING
+ * drop l_lock
+ * ocfs2_dlm_lock()
+ * ocfs2_locking_ast() ocfs2_downconvert_thread()
+ * clear PENDING ocfs2_unblock_lock()
+ * take_l_lock
+ * !BUSY
+ * ocfs2_prepare_downconvert()
+ * set BUSY
+ * set PENDING
+ * drop l_lock
+ * take l_lock
+ * clear PENDING
+ * drop l_lock
+ * <window>
+ * ocfs2_dlm_lock()
+ *
+ * So as you can see, we now have a window where l_lock is not held,
+ * PENDING is not set, and ocfs2_dlm_lock() has not been called.
+ *
+ * The core problem is that ocfs2_cluster_lock() has cleared the PENDING
+ * set by ocfs2_prepare_downconvert(). That wasn't nice.
+ *
+ * To solve this we introduce l_pending_gen. A call to
+ * lockres_clear_pending() will only do so when it is passed a generation
+ * number that matches the lockres. lockres_set_pending() will return the
+ * current generation number. When ocfs2_cluster_lock() goes to clear
+ * PENDING, it passes the generation it got from set_pending(). In our
+ * example above, the generation numbers will *not* match. Thus,
+ * ocfs2_cluster_lock() will not clear the PENDING set by
+ * ocfs2_prepare_downconvert().
+ */
+
+/* Unlocked version for ocfs2_locking_ast() */
+static void __lockres_clear_pending(struct ocfs2_lock_res *lockres,
+ unsigned int generation,
+ struct ocfs2_super *osb)
+{
+ assert_spin_locked(&lockres->l_lock);
+
+ /*
+ * The ast and locking functions can race us here. The winner
+ * will clear pending, the loser will not.
+ */
+ if (!(lockres->l_flags & OCFS2_LOCK_PENDING) ||
+ (lockres->l_pending_gen != generation))
+ return;
+
+ lockres_clear_flags(lockres, OCFS2_LOCK_PENDING);
+ lockres->l_pending_gen++;
+
+ /*
+ * The downconvert thread may have skipped us because we
+ * were PENDING. Wake it up.
+ */
+ if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
+ ocfs2_wake_downconvert_thread(osb);
+}
+
+/* Locked version for callers of ocfs2_dlm_lock() */
+static void lockres_clear_pending(struct ocfs2_lock_res *lockres,
+ unsigned int generation,
+ struct ocfs2_super *osb)
+{
+ unsigned long flags;
+
+ spin_lock_irqsave(&lockres->l_lock, flags);
+ __lockres_clear_pending(lockres, generation, osb);
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
+}
+
+static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
+{
+ assert_spin_locked(&lockres->l_lock);
+ BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
+
+ lockres_or_flags(lockres, OCFS2_LOCK_PENDING);
+
+ return lockres->l_pending_gen;
+}
+
+
static void ocfs2_blocking_ast(void *opaque, int level)
{
struct ocfs2_lock_res *lockres = opaque;
int needs_downconvert;
unsigned long flags;
- BUG_ON(level <= LKM_NLMODE);
+ BUG_ON(level <= DLM_LOCK_NL);
mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
lockres->l_name, level, lockres->l_level,
ocfs2_lock_type_string(lockres->l_type));
+ /*
+ * We can skip the bast for locks which don't enable caching -
+ * they'll be dropped at the earliest possible time anyway.
+ */
+ if (lockres->l_flags & OCFS2_LOCK_NOCACHE)
+ return;
+
spin_lock_irqsave(&lockres->l_lock, flags);
needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
if (needs_downconvert)
wake_up(&lockres->l_event);
- ocfs2_kick_vote_thread(osb);
+ ocfs2_wake_downconvert_thread(osb);
}
static void ocfs2_locking_ast(void *opaque)
{
struct ocfs2_lock_res *lockres = opaque;
- struct dlm_lockstatus *lksb = &lockres->l_lksb;
+ struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
unsigned long flags;
+ int status;
spin_lock_irqsave(&lockres->l_lock, flags);
- if (lksb->status != DLM_NORMAL) {
- mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n",
- lockres->l_name, lksb->status);
+ status = ocfs2_dlm_lock_status(&lockres->l_lksb);
+
+ if (status == -EAGAIN) {
+ lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
+ goto out;
+ }
+
+ if (status) {
+ mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n",
+ lockres->l_name, status);
spin_unlock_irqrestore(&lockres->l_lock, flags);
return;
}
lockres->l_unlock_action);
BUG();
}
-
+out:
/* set it to something invalid so if we get called again we
* can catch it. */
lockres->l_action = OCFS2_AST_INVALID;
+ /* Did we try to cancel this lock? Clear that state */
+ if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT)
+ lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
+
+ /*
+ * We may have beaten the locking functions here. We certainly
+ * know that dlm_lock() has been called :-)
+ * Because we can't have two lock calls in flight at once, we
+ * can use lockres->l_pending_gen.
+ */
+ __lockres_clear_pending(lockres, lockres->l_pending_gen, osb);
+
wake_up(&lockres->l_event);
spin_unlock_irqrestore(&lockres->l_lock, flags);
}
static int ocfs2_lock_create(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres,
int level,
- int dlm_flags)
+ u32 dlm_flags)
{
int ret = 0;
- enum dlm_status status = DLM_NORMAL;
unsigned long flags;
+ unsigned int gen;
mlog_entry_void();
- mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level,
+ mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level,
dlm_flags);
spin_lock_irqsave(&lockres->l_lock, flags);
lockres->l_action = OCFS2_AST_ATTACH;
lockres->l_requested = level;
lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
+ gen = lockres_set_pending(lockres);
spin_unlock_irqrestore(&lockres->l_lock, flags);
- status = dlmlock(osb->dlm,
- level,
- &lockres->l_lksb,
- dlm_flags,
- lockres->l_name,
- OCFS2_LOCK_ID_MAX_LEN - 1,
- ocfs2_locking_ast,
- lockres,
- ocfs2_blocking_ast);
- if (status != DLM_NORMAL) {
- ocfs2_log_dlm_error("dlmlock", status, lockres);
- ret = -EINVAL;
+ ret = ocfs2_dlm_lock(osb->cconn,
+ level,
+ &lockres->l_lksb,
+ dlm_flags,
+ lockres->l_name,
+ OCFS2_LOCK_ID_MAX_LEN - 1,
+ lockres);
+ lockres_clear_pending(lockres, gen, osb);
+ if (ret) {
+ ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
ocfs2_recover_from_dlm_error(lockres, 1);
}
- mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name);
+ mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name);
bail:
mlog_exit(ret);
}
+static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw,
+ struct ocfs2_lock_res *lockres)
+{
+ int ret;
+
+ ret = wait_for_completion_interruptible(&mw->mw_complete);
+ if (ret)
+ lockres_remove_mask_waiter(lockres, mw);
+ else
+ ret = mw->mw_status;
+ /* Re-arm the completion in case we want to wait on it again */
+ INIT_COMPLETION(mw->mw_complete);
+ return ret;
+}
+
static int ocfs2_cluster_lock(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres,
int level,
- int lkm_flags,
+ u32 lkm_flags,
int arg_flags)
{
struct ocfs2_mask_waiter mw;
- enum dlm_status status;
int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
unsigned long flags;
+ unsigned int gen;
+ int noqueue_attempted = 0;
mlog_entry_void();
ocfs2_init_mask_waiter(&mw);
if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
- lkm_flags |= LKM_VALBLK;
+ lkm_flags |= DLM_LKF_VALBLK;
again:
wait = 0;
goto unlock;
}
- if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
- /* lock has not been created yet. */
- spin_unlock_irqrestore(&lockres->l_lock, flags);
-
- ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0);
- if (ret < 0) {
- mlog_errno(ret);
- goto out;
- }
- goto again;
- }
-
if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
!ocfs2_may_continue_on_blocked_lock(lockres, level)) {
/* is the lock is currently blocked on behalf of
}
if (level > lockres->l_level) {
+ if (noqueue_attempted > 0) {
+ ret = -EAGAIN;
+ goto unlock;
+ }
+ if (lkm_flags & DLM_LKF_NOQUEUE)
+ noqueue_attempted = 1;
+
if (lockres->l_action != OCFS2_AST_INVALID)
mlog(ML_ERROR, "lockres %s has action %u pending\n",
lockres->l_name, lockres->l_action);
- lockres->l_action = OCFS2_AST_CONVERT;
+ if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
+ lockres->l_action = OCFS2_AST_ATTACH;
+ lkm_flags &= ~DLM_LKF_CONVERT;
+ } else {
+ lockres->l_action = OCFS2_AST_CONVERT;
+ lkm_flags |= DLM_LKF_CONVERT;
+ }
+
lockres->l_requested = level;
lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
+ gen = lockres_set_pending(lockres);
spin_unlock_irqrestore(&lockres->l_lock, flags);
- BUG_ON(level == LKM_IVMODE);
- BUG_ON(level == LKM_NLMODE);
+ BUG_ON(level == DLM_LOCK_IV);
+ BUG_ON(level == DLM_LOCK_NL);
mlog(0, "lock %s, convert from %d to level = %d\n",
lockres->l_name, lockres->l_level, level);
/* call dlm_lock to upgrade lock now */
- status = dlmlock(osb->dlm,
- level,
- &lockres->l_lksb,
- lkm_flags|LKM_CONVERT,
- lockres->l_name,
- OCFS2_LOCK_ID_MAX_LEN - 1,
- ocfs2_locking_ast,
- lockres,
- ocfs2_blocking_ast);
- if (status != DLM_NORMAL) {
- if ((lkm_flags & LKM_NOQUEUE) &&
- (status == DLM_NOTQUEUED))
- ret = -EAGAIN;
- else {
- ocfs2_log_dlm_error("dlmlock", status,
- lockres);
- ret = -EINVAL;
+ ret = ocfs2_dlm_lock(osb->cconn,
+ level,
+ &lockres->l_lksb,
+ lkm_flags,
+ lockres->l_name,
+ OCFS2_LOCK_ID_MAX_LEN - 1,
+ lockres);
+ lockres_clear_pending(lockres, gen, osb);
+ if (ret) {
+ if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
+ (ret != -EAGAIN)) {
+ ocfs2_log_dlm_error("ocfs2_dlm_lock",
+ ret, lockres);
}
ocfs2_recover_from_dlm_error(lockres, 1);
goto out;
}
- mlog(0, "lock %s, successfull return from dlmlock\n",
+ mlog(0, "lock %s, successfull return from ocfs2_dlm_lock\n",
lockres->l_name);
/* At this point we've gone inside the dlm and need to
mlog_entry_void();
spin_lock_irqsave(&lockres->l_lock, flags);
ocfs2_dec_holders(lockres, level);
- ocfs2_vote_on_unlock(osb, lockres);
+ ocfs2_downconvert_on_unlock(osb, lockres);
spin_unlock_irqrestore(&lockres->l_lock, flags);
mlog_exit_void();
}
int ex,
int local)
{
- int level = ex ? LKM_EXMODE : LKM_PRMODE;
+ int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
unsigned long flags;
- int lkm_flags = local ? LKM_LOCAL : 0;
+ u32 lkm_flags = local ? DLM_LKF_LOCAL : 0;
spin_lock_irqsave(&lockres->l_lock, flags);
BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
}
/*
- * We don't want to use LKM_LOCAL on a meta data lock as they
+ * We don't want to use DLM_LKF_LOCAL on a meta data lock as they
* don't use a generation in their lock names.
*/
- ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_meta_lockres, 1, 0);
- if (ret) {
- mlog_errno(ret);
- goto bail;
- }
-
- ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_data_lockres, 1, 1);
+ ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0);
if (ret) {
mlog_errno(ret);
goto bail;
lockres = &OCFS2_I(inode)->ip_rw_lockres;
- level = write ? LKM_EXMODE : LKM_PRMODE;
+ level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
0);
void ocfs2_rw_unlock(struct inode *inode, int write)
{
- int level = write ? LKM_EXMODE : LKM_PRMODE;
+ int level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
lockres = &OCFS2_I(inode)->ip_open_lockres;
status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
- LKM_PRMODE, 0, 0);
+ DLM_LOCK_PR, 0, 0);
if (status < 0)
mlog_errno(status);
lockres = &OCFS2_I(inode)->ip_open_lockres;
- level = write ? LKM_EXMODE : LKM_PRMODE;
+ level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
/*
* The file system may already holding a PRMODE/EXMODE open lock.
- * Since we pass LKM_NOQUEUE, the request won't block waiting on
+ * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on
* other nodes and the -EAGAIN will indicate to the caller that
* this inode is still in use.
*/
status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
- level, LKM_NOQUEUE, 0);
+ level, DLM_LKF_NOQUEUE, 0);
out:
mlog_exit(status);
if(lockres->l_ro_holders)
ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
- LKM_PRMODE);
+ DLM_LOCK_PR);
if(lockres->l_ex_holders)
ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
- LKM_EXMODE);
+ DLM_LOCK_EX);
out:
mlog_exit_void();
}
-int ocfs2_data_lock_full(struct inode *inode,
- int write,
- int arg_flags)
+static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres,
+ int level)
{
- int status = 0, level;
- struct ocfs2_lock_res *lockres;
- struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
+ int ret;
+ struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
+ unsigned long flags;
+ struct ocfs2_mask_waiter mw;
- BUG_ON(!inode);
+ ocfs2_init_mask_waiter(&mw);
- mlog_entry_void();
+retry_cancel:
+ spin_lock_irqsave(&lockres->l_lock, flags);
+ if (lockres->l_flags & OCFS2_LOCK_BUSY) {
+ ret = ocfs2_prepare_cancel_convert(osb, lockres);
+ if (ret) {
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
+ ret = ocfs2_cancel_convert(osb, lockres);
+ if (ret < 0) {
+ mlog_errno(ret);
+ goto out;
+ }
+ goto retry_cancel;
+ }
+ lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
- mlog(0, "inode %llu take %s DATA lock\n",
- (unsigned long long)OCFS2_I(inode)->ip_blkno,
- write ? "EXMODE" : "PRMODE");
+ ocfs2_wait_for_mask(&mw);
+ goto retry_cancel;
+ }
- /* We'll allow faking a readonly data lock for
- * rodevices. */
- if (ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb))) {
- if (write) {
- status = -EROFS;
- mlog_errno(status);
+ ret = -ERESTARTSYS;
+ /*
+ * We may still have gotten the lock, in which case there's no
+ * point to restarting the syscall.
+ */
+ if (lockres->l_level == level)
+ ret = 0;
+
+ mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret,
+ lockres->l_flags, lockres->l_level, lockres->l_action);
+
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
+
+out:
+ return ret;
+}
+
+/*
+ * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of
+ * flock() calls. The locking approach this requires is sufficiently
+ * different from all other cluster lock types that we implement a
+ * seperate path to the "low-level" dlm calls. In particular:
+ *
+ * - No optimization of lock levels is done - we take at exactly
+ * what's been requested.
+ *
+ * - No lock caching is employed. We immediately downconvert to
+ * no-lock at unlock time. This also means flock locks never go on
+ * the blocking list).
+ *
+ * - Since userspace can trivially deadlock itself with flock, we make
+ * sure to allow cancellation of a misbehaving applications flock()
+ * request.
+ *
+ * - Access to any flock lockres doesn't require concurrency, so we
+ * can simplify the code by requiring the caller to guarantee
+ * serialization of dlmglue flock calls.
+ */
+int ocfs2_file_lock(struct file *file, int ex, int trylock)
+{
+ int ret, level = ex ? LKM_EXMODE : LKM_PRMODE;
+ unsigned int lkm_flags = trylock ? LKM_NOQUEUE : 0;
+ unsigned long flags;
+ struct ocfs2_file_private *fp = file->private_data;
+ struct ocfs2_lock_res *lockres = &fp->fp_flock;
+ struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
+ struct ocfs2_mask_waiter mw;
+
+ ocfs2_init_mask_waiter(&mw);
+
+ if ((lockres->l_flags & OCFS2_LOCK_BUSY) ||
+ (lockres->l_level > DLM_LOCK_NL)) {
+ mlog(ML_ERROR,
+ "File lock \"%s\" has busy or locked state: flags: 0x%lx, "
+ "level: %u\n", lockres->l_name, lockres->l_flags,
+ lockres->l_level);
+ return -EINVAL;
+ }
+
+ spin_lock_irqsave(&lockres->l_lock, flags);
+ if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
+ lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
+
+ /*
+ * Get the lock at NLMODE to start - that way we
+ * can cancel the upconvert request if need be.
+ */
+ ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0);
+ if (ret < 0) {
+ mlog_errno(ret);
+ goto out;
}
- goto out;
+
+ ret = ocfs2_wait_for_mask(&mw);
+ if (ret) {
+ mlog_errno(ret);
+ goto out;
+ }
+ spin_lock_irqsave(&lockres->l_lock, flags);
}
- if (ocfs2_mount_local(osb))
- goto out;
+ lockres->l_action = OCFS2_AST_CONVERT;
+ lkm_flags |= LKM_CONVERT;
+ lockres->l_requested = level;
+ lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
- lockres = &OCFS2_I(inode)->ip_data_lockres;
+ lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
- level = write ? LKM_EXMODE : LKM_PRMODE;
+ ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
+ lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1,
+ lockres);
+ if (ret) {
+ if (!trylock || (ret != -EAGAIN)) {
+ ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
+ ret = -EINVAL;
+ }
- status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level,
- 0, arg_flags);
- if (status < 0 && status != -EAGAIN)
- mlog_errno(status);
+ ocfs2_recover_from_dlm_error(lockres, 1);
+ lockres_remove_mask_waiter(lockres, &mw);
+ goto out;
+ }
+
+ ret = ocfs2_wait_for_mask_interruptible(&mw, lockres);
+ if (ret == -ERESTARTSYS) {
+ /*
+ * Userspace can cause deadlock itself with
+ * flock(). Current behavior locally is to allow the
+ * deadlock, but abort the system call if a signal is
+ * received. We follow this example, otherwise a
+ * poorly written program could sit in kernel until
+ * reboot.
+ *
+ * Handling this is a bit more complicated for Ocfs2
+ * though. We can't exit this function with an
+ * outstanding lock request, so a cancel convert is
+ * required. We intentionally overwrite 'ret' - if the
+ * cancel fails and the lock was granted, it's easier
+ * to just bubble sucess back up to the user.
+ */
+ ret = ocfs2_flock_handle_signal(lockres, level);
+ } else if (!ret && (level > lockres->l_level)) {
+ /* Trylock failed asynchronously */
+ BUG_ON(!trylock);
+ ret = -EAGAIN;
+ }
out:
- mlog_exit(status);
- return status;
+
+ mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n",
+ lockres->l_name, ex, trylock, ret);
+ return ret;
}
-/* see ocfs2_meta_lock_with_page() */
-int ocfs2_data_lock_with_page(struct inode *inode,
- int write,
- struct page *page)
+void ocfs2_file_unlock(struct file *file)
{
int ret;
+ unsigned int gen;
+ unsigned long flags;
+ struct ocfs2_file_private *fp = file->private_data;
+ struct ocfs2_lock_res *lockres = &fp->fp_flock;
+ struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
+ struct ocfs2_mask_waiter mw;
- ret = ocfs2_data_lock_full(inode, write, OCFS2_LOCK_NONBLOCK);
- if (ret == -EAGAIN) {
- unlock_page(page);
- if (ocfs2_data_lock(inode, write) == 0)
- ocfs2_data_unlock(inode, write);
- ret = AOP_TRUNCATED_PAGE;
+ ocfs2_init_mask_waiter(&mw);
+
+ if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED))
+ return;
+
+ if (lockres->l_level == LKM_NLMODE)
+ return;
+
+ mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n",
+ lockres->l_name, lockres->l_flags, lockres->l_level,
+ lockres->l_action);
+
+ spin_lock_irqsave(&lockres->l_lock, flags);
+ /*
+ * Fake a blocking ast for the downconvert code.
+ */
+ lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
+ lockres->l_blocking = DLM_LOCK_EX;
+
+ gen = ocfs2_prepare_downconvert(lockres, LKM_NLMODE);
+ lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
+
+ ret = ocfs2_downconvert_lock(osb, lockres, LKM_NLMODE, 0, gen);
+ if (ret) {
+ mlog_errno(ret);
+ return;
}
- return ret;
+ ret = ocfs2_wait_for_mask(&mw);
+ if (ret)
+ mlog_errno(ret);
}
-static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
- struct ocfs2_lock_res *lockres)
+static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
+ struct ocfs2_lock_res *lockres)
{
int kick = 0;
mlog_entry_void();
/* If we know that another node is waiting on our lock, kick
- * the vote thread * pre-emptively when we reach a release
+ * the downconvert thread * pre-emptively when we reach a release
* condition. */
if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
switch(lockres->l_blocking) {
- case LKM_EXMODE:
+ case DLM_LOCK_EX:
if (!lockres->l_ex_holders && !lockres->l_ro_holders)
kick = 1;
break;
- case LKM_PRMODE:
+ case DLM_LOCK_PR:
if (!lockres->l_ex_holders)
kick = 1;
break;
}
if (kick)
- ocfs2_kick_vote_thread(osb);
-
- mlog_exit_void();
-}
-
-void ocfs2_data_unlock(struct inode *inode,
- int write)
-{
- int level = write ? LKM_EXMODE : LKM_PRMODE;
- struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_data_lockres;
- struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
-
- mlog_entry_void();
-
- mlog(0, "inode %llu drop %s DATA lock\n",
- (unsigned long long)OCFS2_I(inode)->ip_blkno,
- write ? "EXMODE" : "PRMODE");
-
- if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
- !ocfs2_mount_local(osb))
- ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
+ ocfs2_wake_downconvert_thread(osb);
mlog_exit_void();
}
/* Call this with the lockres locked. I am reasonably sure we don't
* need ip_lock in this function as anyone who would be changing those
- * values is supposed to be blocked in ocfs2_meta_lock right now. */
+ * values is supposed to be blocked in ocfs2_inode_lock right now. */
static void __ocfs2_stuff_meta_lvb(struct inode *inode)
{
struct ocfs2_inode_info *oi = OCFS2_I(inode);
- struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
+ struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
struct ocfs2_meta_lvb *lvb;
mlog_entry_void();
- lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
+ lvb = (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);
/*
* Invalidate the LVB of a deleted inode - this way other
static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
{
struct ocfs2_inode_info *oi = OCFS2_I(inode);
- struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
+ struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
struct ocfs2_meta_lvb *lvb;
mlog_entry_void();
mlog_meta_lvb(0, lockres);
- lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
+ lvb = (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);
/* We're safe here without the lockres lock... */
spin_lock(&oi->ip_lock);
static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
struct ocfs2_lock_res *lockres)
{
- struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
+ struct ocfs2_meta_lvb *lvb =
+ (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);
if (lvb->lvb_version == OCFS2_LVB_VERSION
&& be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
}
/* may or may not return a bh if it went to disk. */
-static int ocfs2_meta_lock_update(struct inode *inode,
+static int ocfs2_inode_lock_update(struct inode *inode,
struct buffer_head **bh)
{
int status = 0;
struct ocfs2_inode_info *oi = OCFS2_I(inode);
- struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
+ struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
struct ocfs2_dinode *fe;
struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
* returns < 0 error if the callback will never be called, otherwise
* the result of the lock will be communicated via the callback.
*/
-int ocfs2_meta_lock_full(struct inode *inode,
+int ocfs2_inode_lock_full(struct inode *inode,
struct buffer_head **ret_bh,
int ex,
int arg_flags)
{
- int status, level, dlm_flags, acquired;
+ int status, level, acquired;
+ u32 dlm_flags;
struct ocfs2_lock_res *lockres = NULL;
struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
struct buffer_head *local_bh = NULL;
goto local;
if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
- wait_event(osb->recovery_event,
- ocfs2_node_map_is_empty(osb, &osb->recovery_map));
+ ocfs2_wait_for_recovery(osb);
- lockres = &OCFS2_I(inode)->ip_meta_lockres;
- level = ex ? LKM_EXMODE : LKM_PRMODE;
+ lockres = &OCFS2_I(inode)->ip_inode_lockres;
+ level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
dlm_flags = 0;
if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
- dlm_flags |= LKM_NOQUEUE;
+ dlm_flags |= DLM_LKF_NOQUEUE;
status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
if (status < 0) {
* committed to owning this lock so we don't allow signals to
* abort the operation. */
if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
- wait_event(osb->recovery_event,
- ocfs2_node_map_is_empty(osb, &osb->recovery_map));
+ ocfs2_wait_for_recovery(osb);
local:
/*
}
/* This is fun. The caller may want a bh back, or it may
- * not. ocfs2_meta_lock_update definitely wants one in, but
+ * not. ocfs2_inode_lock_update definitely wants one in, but
* may or may not read one, depending on what's in the
* LVB. The result of all of this is that we've *only* gone to
* disk if we have to, so the complexity is worthwhile. */
- status = ocfs2_meta_lock_update(inode, &local_bh);
+ status = ocfs2_inode_lock_update(inode, &local_bh);
if (status < 0) {
if (status != -ENOENT)
mlog_errno(status);
*ret_bh = NULL;
}
if (acquired)
- ocfs2_meta_unlock(inode, ex);
+ ocfs2_inode_unlock(inode, ex);
}
if (local_bh)
}
/*
- * This is working around a lock inversion between tasks acquiring DLM locks
- * while holding a page lock and the vote thread which blocks dlm lock acquiry
- * while acquiring page locks.
+ * This is working around a lock inversion between tasks acquiring DLM
+ * locks while holding a page lock and the downconvert thread which
+ * blocks dlm lock acquiry while acquiring page locks.
*
* ** These _with_page variantes are only intended to be called from aop
* methods that hold page locks and return a very specific *positive* error
* code that aop methods pass up to the VFS -- test for errors with != 0. **
*
- * The DLM is called such that it returns -EAGAIN if it would have blocked
- * waiting for the vote thread. In that case we unlock our page so the vote
- * thread can make progress. Once we've done this we have to return
- * AOP_TRUNCATED_PAGE so the aop method that called us can bubble that back up
- * into the VFS who will then immediately retry the aop call.
+ * The DLM is called such that it returns -EAGAIN if it would have
+ * blocked waiting for the downconvert thread. In that case we unlock
+ * our page so the downconvert thread can make progress. Once we've
+ * done this we have to return AOP_TRUNCATED_PAGE so the aop method
+ * that called us can bubble that back up into the VFS who will then
+ * immediately retry the aop call.
*
* We do a blocking lock and immediate unlock before returning, though, so that
* the lock has a great chance of being cached on this node by the time the VFS
* ping locks back and forth, but that's a risk we're willing to take to avoid
* the lock inversion simply.
*/
-int ocfs2_meta_lock_with_page(struct inode *inode,
+int ocfs2_inode_lock_with_page(struct inode *inode,
struct buffer_head **ret_bh,
int ex,
struct page *page)
{
int ret;
- ret = ocfs2_meta_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
+ ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
if (ret == -EAGAIN) {
unlock_page(page);
- if (ocfs2_meta_lock(inode, ret_bh, ex) == 0)
- ocfs2_meta_unlock(inode, ex);
+ if (ocfs2_inode_lock(inode, ret_bh, ex) == 0)
+ ocfs2_inode_unlock(inode, ex);
ret = AOP_TRUNCATED_PAGE;
}
return ret;
}
-int ocfs2_meta_lock_atime(struct inode *inode,
+int ocfs2_inode_lock_atime(struct inode *inode,
struct vfsmount *vfsmnt,
int *level)
{
int ret;
mlog_entry_void();
- ret = ocfs2_meta_lock(inode, NULL, 0);
+ ret = ocfs2_inode_lock(inode, NULL, 0);
if (ret < 0) {
mlog_errno(ret);
return ret;
if (ocfs2_should_update_atime(inode, vfsmnt)) {
struct buffer_head *bh = NULL;
- ocfs2_meta_unlock(inode, 0);
- ret = ocfs2_meta_lock(inode, &bh, 1);
+ ocfs2_inode_unlock(inode, 0);
+ ret = ocfs2_inode_lock(inode, &bh, 1);
if (ret < 0) {
mlog_errno(ret);
return ret;
return ret;
}
-void ocfs2_meta_unlock(struct inode *inode,
+void ocfs2_inode_unlock(struct inode *inode,
int ex)
{
- int level = ex ? LKM_EXMODE : LKM_PRMODE;
- struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
+ int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
+ struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres;
struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
mlog_entry_void();
int ex)
{
int status = 0;
- int level = ex ? LKM_EXMODE : LKM_PRMODE;
+ int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
- struct buffer_head *bh;
- struct ocfs2_slot_info *si = osb->slot_info;
mlog_entry_void();
goto bail;
}
if (status) {
- bh = si->si_bh;
- status = ocfs2_read_block(osb, bh->b_blocknr, &bh, 0,
- si->si_inode);
- if (status == 0)
- ocfs2_update_slot_info(si);
+ status = ocfs2_refresh_slot_info(osb);
ocfs2_complete_lock_res_refresh(lockres, status);
void ocfs2_super_unlock(struct ocfs2_super *osb,
int ex)
{
- int level = ex ? LKM_EXMODE : LKM_PRMODE;
+ int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
if (!ocfs2_mount_local(osb))
if (ocfs2_mount_local(osb))
return 0;
- status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0);
+ status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
if (status < 0)
mlog_errno(status);
struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
if (!ocfs2_mount_local(osb))
- ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
+ ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
}
int ocfs2_dentry_lock(struct dentry *dentry, int ex)
{
int ret;
- int level = ex ? LKM_EXMODE : LKM_PRMODE;
+ int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
{
- int level = ex ? LKM_EXMODE : LKM_PRMODE;
+ int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
lockres->l_blocking);
/* Dump the raw LVB */
- lvb = lockres->l_lksb.lvb;
+ lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
for(i = 0; i < DLM_LVB_LEN; i++)
seq_printf(m, "0x%x\t", lvb[i]);
return 0;
}
-static struct seq_operations ocfs2_dlm_seq_ops = {
+static const struct seq_operations ocfs2_dlm_seq_ops = {
.start = ocfs2_dlm_seq_start,
.stop = ocfs2_dlm_seq_stop,
.next = ocfs2_dlm_seq_next,
int ocfs2_dlm_init(struct ocfs2_super *osb)
{
int status = 0;
- u32 dlm_key;
- struct dlm_ctxt *dlm = NULL;
+ struct ocfs2_cluster_connection *conn = NULL;
mlog_entry_void();
- if (ocfs2_mount_local(osb))
+ if (ocfs2_mount_local(osb)) {
+ osb->node_num = 0;
goto local;
+ }
status = ocfs2_dlm_init_debug(osb);
if (status < 0) {
goto bail;
}
- /* launch vote thread */
- osb->vote_task = kthread_run(ocfs2_vote_thread, osb, "ocfs2vote");
- if (IS_ERR(osb->vote_task)) {
- status = PTR_ERR(osb->vote_task);
- osb->vote_task = NULL;
+ /* launch downconvert thread */
+ osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc");
+ if (IS_ERR(osb->dc_task)) {
+ status = PTR_ERR(osb->dc_task);
+ osb->dc_task = NULL;
mlog_errno(status);
goto bail;
}
- /* used by the dlm code to make message headers unique, each
- * node in this domain must agree on this. */
- dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str));
-
/* for now, uuid == domain */
- dlm = dlm_register_domain(osb->uuid_str, dlm_key);
- if (IS_ERR(dlm)) {
- status = PTR_ERR(dlm);
+ status = ocfs2_cluster_connect(osb->osb_cluster_stack,
+ osb->uuid_str,
+ strlen(osb->uuid_str),
+ ocfs2_do_node_down, osb,
+ &conn);
+ if (status) {
mlog_errno(status);
goto bail;
}
- dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
+ status = ocfs2_cluster_this_node(&osb->node_num);
+ if (status < 0) {
+ mlog_errno(status);
+ mlog(ML_ERROR,
+ "could not find this host's node number\n");
+ ocfs2_cluster_disconnect(conn, 0);
+ goto bail;
+ }
local:
ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
- osb->dlm = dlm;
+ osb->cconn = conn;
status = 0;
bail:
if (status < 0) {
ocfs2_dlm_shutdown_debug(osb);
- if (osb->vote_task)
- kthread_stop(osb->vote_task);
+ if (osb->dc_task)
+ kthread_stop(osb->dc_task);
}
mlog_exit(status);
return status;
}
-void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
+void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
+ int hangup_pending)
{
mlog_entry_void();
- dlm_unregister_eviction_cb(&osb->osb_eviction_cb);
-
ocfs2_drop_osb_locks(osb);
- if (osb->vote_task) {
- kthread_stop(osb->vote_task);
- osb->vote_task = NULL;
+ /*
+ * Now that we have dropped all locks and ocfs2_dismount_volume()
+ * has disabled recovery, the DLM won't be talking to us. It's
+ * safe to tear things down before disconnecting the cluster.
+ */
+
+ if (osb->dc_task) {
+ kthread_stop(osb->dc_task);
+ osb->dc_task = NULL;
}
ocfs2_lock_res_free(&osb->osb_super_lockres);
ocfs2_lock_res_free(&osb->osb_rename_lockres);
- dlm_unregister_domain(osb->dlm);
- osb->dlm = NULL;
+ ocfs2_cluster_disconnect(osb->cconn, hangup_pending);
+ osb->cconn = NULL;
ocfs2_dlm_shutdown_debug(osb);
mlog_exit_void();
}
-static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
+static void ocfs2_unlock_ast(void *opaque, int error)
{
struct ocfs2_lock_res *lockres = opaque;
unsigned long flags;
lockres->l_unlock_action);
spin_lock_irqsave(&lockres->l_lock, flags);
- /* We tried to cancel a convert request, but it was already
- * granted. All we want to do here is clear our unlock
- * state. The wake_up call done at the bottom is redundant
- * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
- * hurt anything anyway */
- if (status == DLM_CANCELGRANT &&
- lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
- mlog(0, "Got cancelgrant for %s\n", lockres->l_name);
-
- /* We don't clear the busy flag in this case as it
- * should have been cleared by the ast which the dlm
- * has called. */
- goto complete_unlock;
- }
-
- if (status != DLM_NORMAL) {
- mlog(ML_ERROR, "Dlm passes status %d for lock %s, "
- "unlock_action %d\n", status, lockres->l_name,
+ if (error) {
+ mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
+ "unlock_action %d\n", error, lockres->l_name,
lockres->l_unlock_action);
spin_unlock_irqrestore(&lockres->l_lock, flags);
return;
lockres->l_action = OCFS2_AST_INVALID;
break;
case OCFS2_UNLOCK_DROP_LOCK:
- lockres->l_level = LKM_IVMODE;
+ lockres->l_level = DLM_LOCK_IV;
break;
default:
BUG();
}
lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
-complete_unlock:
lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
spin_unlock_irqrestore(&lockres->l_lock, flags);
static int ocfs2_drop_lock(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres)
{
- enum dlm_status status;
+ int ret;
unsigned long flags;
- int lkm_flags = 0;
+ u32 lkm_flags = 0;
/* We didn't get anywhere near actually using this lockres. */
if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
goto out;
if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
- lkm_flags |= LKM_VALBLK;
+ lkm_flags |= DLM_LKF_VALBLK;
spin_lock_irqsave(&lockres->l_lock, flags);
if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
- lockres->l_level == LKM_EXMODE &&
+ lockres->l_level == DLM_LOCK_EX &&
!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
lockres->l_ops->set_lvb(lockres);
}
mlog(0, "lock %s\n", lockres->l_name);
- status = dlmunlock(osb->dlm, &lockres->l_lksb, lkm_flags,
- ocfs2_unlock_ast, lockres);
- if (status != DLM_NORMAL) {
- ocfs2_log_dlm_error("dlmunlock", status, lockres);
+ ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags,
+ lockres);
+ if (ret) {
+ ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
- dlm_print_one_lock(lockres->l_lksb.lockid);
+ ocfs2_dlm_dump_lksb(&lockres->l_lksb);
BUG();
}
- mlog(0, "lock %s, successfull return from dlmunlock\n",
+ mlog(0, "lock %s, successfull return from ocfs2_dlm_unlock\n",
lockres->l_name);
ocfs2_wait_on_busy_lock(lockres);
/* Mark the lockres as being dropped. It will no longer be
* queued if blocking, but we still may have to wait on it
- * being dequeued from the vote thread before we can consider
+ * being dequeued from the downconvert thread before we can consider
* it safe to drop.
*
* You can *not* attempt to call cluster_lock on this lockres anymore. */
status = err;
err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
- &OCFS2_I(inode)->ip_data_lockres);
- if (err < 0)
- mlog_errno(err);
- if (err < 0 && !status)
- status = err;
-
- err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
- &OCFS2_I(inode)->ip_meta_lockres);
+ &OCFS2_I(inode)->ip_inode_lockres);
if (err < 0)
mlog_errno(err);
if (err < 0 && !status)
return status;
}
-static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
- int new_level)
+static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
+ int new_level)
{
assert_spin_locked(&lockres->l_lock);
- BUG_ON(lockres->l_blocking <= LKM_NLMODE);
+ BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
if (lockres->l_level <= new_level) {
- mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n",
+ mlog(ML_ERROR, "lockres->l_level (%d) <= new_level (%d)\n",
lockres->l_level, new_level);
BUG();
}
lockres->l_action = OCFS2_AST_DOWNCONVERT;
lockres->l_requested = new_level;
lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
+ return lockres_set_pending(lockres);
}
static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres,
int new_level,
- int lvb)
+ int lvb,
+ unsigned int generation)
{
- int ret, dlm_flags = LKM_CONVERT;
- enum dlm_status status;
+ int ret;
+ u32 dlm_flags = DLM_LKF_CONVERT;
mlog_entry_void();
if (lvb)
- dlm_flags |= LKM_VALBLK;
-
- status = dlmlock(osb->dlm,
- new_level,
- &lockres->l_lksb,
- dlm_flags,
- lockres->l_name,
- OCFS2_LOCK_ID_MAX_LEN - 1,
- ocfs2_locking_ast,
- lockres,
- ocfs2_blocking_ast);
- if (status != DLM_NORMAL) {
- ocfs2_log_dlm_error("dlmlock", status, lockres);
- ret = -EINVAL;
+ dlm_flags |= DLM_LKF_VALBLK;
+
+ ret = ocfs2_dlm_lock(osb->cconn,
+ new_level,
+ &lockres->l_lksb,
+ dlm_flags,
+ lockres->l_name,
+ OCFS2_LOCK_ID_MAX_LEN - 1,
+ lockres);
+ lockres_clear_pending(lockres, generation, osb);
+ if (ret) {
+ ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
ocfs2_recover_from_dlm_error(lockres, 1);
goto bail;
}
return ret;
}
-/* returns 1 when the caller should unlock and call dlmunlock */
+/* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */
static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres)
{
struct ocfs2_lock_res *lockres)
{
int ret;
- enum dlm_status status;
mlog_entry_void();
mlog(0, "lock %s\n", lockres->l_name);
- ret = 0;
- status = dlmunlock(osb->dlm,
- &lockres->l_lksb,
- LKM_CANCEL,
- ocfs2_unlock_ast,
- lockres);
- if (status != DLM_NORMAL) {
- ocfs2_log_dlm_error("dlmunlock", status, lockres);
- ret = -EINVAL;
+ ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
+ DLM_LKF_CANCEL, lockres);
+ if (ret) {
+ ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
ocfs2_recover_from_dlm_error(lockres, 0);
}
- mlog(0, "lock %s return from dlmunlock\n", lockres->l_name);
+ mlog(0, "lock %s return from ocfs2_dlm_unlock\n", lockres->l_name);
mlog_exit(ret);
return ret;
int new_level;
int ret = 0;
int set_lvb = 0;
+ unsigned int gen;
mlog_entry_void();
recheck:
if (lockres->l_flags & OCFS2_LOCK_BUSY) {
+ /* XXX
+ * This is a *big* race. The OCFS2_LOCK_PENDING flag
+ * exists entirely for one reason - another thread has set
+ * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock().
+ *
+ * If we do ocfs2_cancel_convert() before the other thread
+ * calls dlm_lock(), our cancel will do nothing. We will
+ * get no ast, and we will have no way of knowing the
+ * cancel failed. Meanwhile, the other thread will call
+ * into dlm_lock() and wait...forever.
+ *
+ * Why forever? Because another node has asked for the
+ * lock first; that's why we're here in unblock_lock().
+ *
+ * The solution is OCFS2_LOCK_PENDING. When PENDING is
+ * set, we just requeue the unblock. Only when the other
+ * thread has called dlm_lock() and cleared PENDING will
+ * we then cancel their request.
+ *
+ * All callers of dlm_lock() must set OCFS2_DLM_PENDING
+ * at the same time they set OCFS2_DLM_BUSY. They must
+ * clear OCFS2_DLM_PENDING after dlm_lock() returns.
+ */
+ if (lockres->l_flags & OCFS2_LOCK_PENDING)
+ goto leave_requeue;
+
ctl->requeue = 1;
ret = ocfs2_prepare_cancel_convert(osb, lockres);
spin_unlock_irqrestore(&lockres->l_lock, flags);
/* if we're blocking an exclusive and we have *any* holders,
* then requeue. */
- if ((lockres->l_blocking == LKM_EXMODE)
+ if ((lockres->l_blocking == DLM_LOCK_EX)
&& (lockres->l_ex_holders || lockres->l_ro_holders))
goto leave_requeue;
/* If it's a PR we're blocking, then only
* requeue if we've got any EX holders */
- if (lockres->l_blocking == LKM_PRMODE &&
+ if (lockres->l_blocking == DLM_LOCK_PR &&
lockres->l_ex_holders)
goto leave_requeue;
ctl->requeue = 0;
if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
- if (lockres->l_level == LKM_EXMODE)
+ if (lockres->l_level == DLM_LOCK_EX)
set_lvb = 1;
/*
lockres->l_ops->set_lvb(lockres);
}
- ocfs2_prepare_downconvert(lockres, new_level);
+ gen = ocfs2_prepare_downconvert(lockres, new_level);
spin_unlock_irqrestore(&lockres->l_lock, flags);
- ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
+ ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb,
+ gen);
+
leave:
mlog_exit(ret);
return ret;
inode = ocfs2_lock_res_inode(lockres);
mapping = inode->i_mapping;
+ if (!S_ISREG(inode->i_mode))
+ goto out;
+
/*
* We need this before the filemap_fdatawrite() so that it can
* transfer the dirty bit from the PTE to the
(unsigned long long)OCFS2_I(inode)->ip_blkno);
}
sync_mapping_buffers(mapping);
- if (blocking == LKM_EXMODE) {
+ if (blocking == DLM_LOCK_EX) {
truncate_inode_pages(mapping, 0);
} else {
/* We only need to wait on the I/O if we're not also
filemap_fdatawait(mapping);
}
+out:
return UNBLOCK_CONTINUE;
}
struct inode *inode = ocfs2_lock_res_inode(lockres);
int checkpointed = ocfs2_inode_fully_checkpointed(inode);
- BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE);
- BUG_ON(lockres->l_level != LKM_EXMODE && !checkpointed);
+ BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR);
+ BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed);
if (checkpointed)
return 1;
/*
* Does the final reference drop on our dentry lock. Right now this
- * happens in the vote thread, but we could choose to simplify the
+ * happens in the downconvert thread, but we could choose to simplify the
* dlmglue API and push these off to the ocfs2_wq in the future.
*/
static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
* valid. The downconvert code will retain a PR for this node,
* so there's no further work to do.
*/
- if (blocking == LKM_PRMODE)
+ if (blocking == DLM_LOCK_PR)
return UNBLOCK_CONTINUE;
/*
return UNBLOCK_CONTINUE_POST;
}
-void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
- struct ocfs2_lock_res *lockres)
+/*
+ * This is the filesystem locking protocol. It provides the lock handling
+ * hooks for the underlying DLM. It has a maximum version number.
+ * The version number allows interoperability with systems running at
+ * the same major number and an equal or smaller minor number.
+ *
+ * Whenever the filesystem does new things with locks (adds or removes a
+ * lock, orders them differently, does different things underneath a lock),
+ * the version must be changed. The protocol is negotiated when joining
+ * the dlm domain. A node may join the domain if its major version is
+ * identical to all other nodes and its minor version is greater than
+ * or equal to all other nodes. When its minor version is greater than
+ * the other nodes, it will run at the minor version specified by the
+ * other nodes.
+ *
+ * If a locking change is made that will not be compatible with older
+ * versions, the major number must be increased and the minor version set
+ * to zero. If a change merely adds a behavior that can be disabled when
+ * speaking to older versions, the minor version must be increased. If a
+ * change adds a fully backwards compatible change (eg, LVB changes that
+ * are just ignored by older versions), the version does not need to be
+ * updated.
+ */
+static struct ocfs2_locking_protocol lproto = {
+ .lp_max_version = {
+ .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
+ .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
+ },
+ .lp_lock_ast = ocfs2_locking_ast,
+ .lp_blocking_ast = ocfs2_blocking_ast,
+ .lp_unlock_ast = ocfs2_unlock_ast,
+};
+
+void ocfs2_set_locking_protocol(void)
+{
+ ocfs2_stack_glue_set_locking_protocol(&lproto);
+}
+
+
+static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
+ struct ocfs2_lock_res *lockres)
{
int status;
struct ocfs2_unblock_ctl ctl = {0, 0,};
mlog(0, "lockres %s blocked.\n", lockres->l_name);
/* Detect whether a lock has been marked as going away while
- * the vote thread was processing other things. A lock can
+ * the downconvert thread was processing other things. A lock can
* still be marked with OCFS2_LOCK_FREEING after this check,
* but short circuiting here will still save us some
* performance. */
lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
- spin_lock(&osb->vote_task_lock);
+ spin_lock(&osb->dc_task_lock);
if (list_empty(&lockres->l_blocked_list)) {
list_add_tail(&lockres->l_blocked_list,
&osb->blocked_lock_list);
osb->blocked_lock_count++;
}
- spin_unlock(&osb->vote_task_lock);
+ spin_unlock(&osb->dc_task_lock);
mlog_exit_void();
}
+
+static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb)
+{
+ unsigned long processed;
+ struct ocfs2_lock_res *lockres;
+
+ mlog_entry_void();
+
+ spin_lock(&osb->dc_task_lock);
+ /* grab this early so we know to try again if a state change and
+ * wake happens part-way through our work */
+ osb->dc_work_sequence = osb->dc_wake_sequence;
+
+ processed = osb->blocked_lock_count;
+ while (processed) {
+ BUG_ON(list_empty(&osb->blocked_lock_list));
+
+ lockres = list_entry(osb->blocked_lock_list.next,
+ struct ocfs2_lock_res, l_blocked_list);
+ list_del_init(&lockres->l_blocked_list);
+ osb->blocked_lock_count--;
+ spin_unlock(&osb->dc_task_lock);
+
+ BUG_ON(!processed);
+ processed--;
+
+ ocfs2_process_blocked_lock(osb, lockres);
+
+ spin_lock(&osb->dc_task_lock);
+ }
+ spin_unlock(&osb->dc_task_lock);
+
+ mlog_exit_void();
+}
+
+static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb)
+{
+ int empty = 0;
+
+ spin_lock(&osb->dc_task_lock);
+ if (list_empty(&osb->blocked_lock_list))
+ empty = 1;
+
+ spin_unlock(&osb->dc_task_lock);
+ return empty;
+}
+
+static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb)
+{
+ int should_wake = 0;
+
+ spin_lock(&osb->dc_task_lock);
+ if (osb->dc_work_sequence != osb->dc_wake_sequence)
+ should_wake = 1;
+ spin_unlock(&osb->dc_task_lock);
+
+ return should_wake;
+}
+
+static int ocfs2_downconvert_thread(void *arg)
+{
+ int status = 0;
+ struct ocfs2_super *osb = arg;
+
+ /* only quit once we've been asked to stop and there is no more
+ * work available */
+ while (!(kthread_should_stop() &&
+ ocfs2_downconvert_thread_lists_empty(osb))) {
+
+ wait_event_interruptible(osb->dc_event,
+ ocfs2_downconvert_thread_should_wake(osb) ||
+ kthread_should_stop());
+
+ mlog(0, "downconvert_thread: awoken\n");
+
+ ocfs2_downconvert_thread_do_work(osb);
+ }
+
+ osb->dc_task = NULL;
+ return status;
+}
+
+void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb)
+{
+ spin_lock(&osb->dc_task_lock);
+ /* make sure the voting thread gets a swipe at whatever changes
+ * the caller may have made to the voting state */
+ osb->dc_wake_sequence++;
+ spin_unlock(&osb->dc_task_lock);
+ wake_up(&osb->dc_event);
+}