1 /* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
6 * Code which implements an OCFS2 specific interface to our DLM.
8 * Copyright (C) 2003, 2004 Oracle. All rights reserved.
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * General Public License for more details.
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
26 #include <linux/types.h>
27 #include <linux/slab.h>
28 #include <linux/highmem.h>
30 #include <linux/smp_lock.h>
31 #include <linux/crc32.h>
32 #include <linux/kthread.h>
33 #include <linux/pagemap.h>
34 #include <linux/debugfs.h>
35 #include <linux/seq_file.h>
37 #include <cluster/heartbeat.h>
38 #include <cluster/nodemanager.h>
39 #include <cluster/tcp.h>
41 #include <dlm/dlmapi.h>
43 #define MLOG_MASK_PREFIX ML_DLM_GLUE
44 #include <cluster/masklog.h>
51 #include "extent_map.h"
52 #include "heartbeat.h"
60 #include "buffer_head_io.h"
62 struct ocfs2_mask_waiter {
63 struct list_head mw_item;
65 struct completion mw_complete;
66 unsigned long mw_mask;
67 unsigned long mw_goal;
70 static void ocfs2_inode_ast_func(void *opaque);
71 static void ocfs2_inode_bast_func(void *opaque,
73 static void ocfs2_dentry_ast_func(void *opaque);
74 static void ocfs2_dentry_bast_func(void *opaque,
76 static void ocfs2_super_ast_func(void *opaque);
77 static void ocfs2_super_bast_func(void *opaque,
79 static void ocfs2_rename_ast_func(void *opaque);
80 static void ocfs2_rename_bast_func(void *opaque,
84 * Return value from ocfs2_convert_worker_t functions.
86 * These control the precise actions of ocfs2_generic_unblock_lock()
87 * and ocfs2_process_blocked_lock()
90 enum ocfs2_unblock_action {
91 UNBLOCK_CONTINUE = 0, /* Continue downconvert */
92 UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire
93 * ->post_unlock callback */
94 UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire
95 * ->post_unlock() callback. */
98 struct ocfs2_unblock_ctl {
100 enum ocfs2_unblock_action unblock_action;
103 /* so far, all locks have gotten along with the same unlock ast */
104 static void ocfs2_unlock_ast_func(void *opaque,
105 enum dlm_status status);
106 static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
107 struct ocfs2_unblock_ctl *ctl);
108 static int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
109 struct ocfs2_unblock_ctl *ctl);
110 static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
111 struct ocfs2_unblock_ctl *ctl);
112 static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres,
113 struct ocfs2_unblock_ctl *ctl);
114 static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
115 struct ocfs2_unblock_ctl *ctl);
117 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
118 struct ocfs2_lock_res *lockres);
121 * OCFS2 Lock Resource Operations
123 * These fine tune the behavior of the generic dlmglue locking infrastructure.
125 struct ocfs2_lock_res_ops {
127 void (*bast)(void *, int);
128 void (*unlock_ast)(void *, enum dlm_status);
129 int (*unblock)(struct ocfs2_lock_res *, struct ocfs2_unblock_ctl *);
130 void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
133 * LOCK_TYPE_* flags which describe the specific requirements
134 * of a lock type. Descriptions of each individual flag follow.
140 * Some locks want to "refresh" potentially stale data when a
141 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
142 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
143 * individual lockres l_flags member from the ast function. It is
144 * expected that the locking wrapper will clear the
145 * OCFS2_LOCK_NEEDS_REFRESH flag when done.
147 #define LOCK_TYPE_REQUIRES_REFRESH 0x1
149 typedef int (ocfs2_convert_worker_t)(struct ocfs2_lock_res *, int);
150 static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
151 struct ocfs2_lock_res *lockres,
152 struct ocfs2_unblock_ctl *ctl,
153 ocfs2_convert_worker_t *worker);
155 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
156 .ast = ocfs2_inode_ast_func,
157 .bast = ocfs2_inode_bast_func,
158 .unlock_ast = ocfs2_unlock_ast_func,
159 .unblock = ocfs2_unblock_inode_lock,
163 static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = {
164 .ast = ocfs2_inode_ast_func,
165 .bast = ocfs2_inode_bast_func,
166 .unlock_ast = ocfs2_unlock_ast_func,
167 .unblock = ocfs2_unblock_meta,
168 .flags = LOCK_TYPE_REQUIRES_REFRESH,
171 static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = {
172 .ast = ocfs2_inode_ast_func,
173 .bast = ocfs2_inode_bast_func,
174 .unlock_ast = ocfs2_unlock_ast_func,
175 .unblock = ocfs2_unblock_data,
179 static struct ocfs2_lock_res_ops ocfs2_super_lops = {
180 .ast = ocfs2_super_ast_func,
181 .bast = ocfs2_super_bast_func,
182 .unlock_ast = ocfs2_unlock_ast_func,
183 .unblock = ocfs2_unblock_osb_lock,
184 .flags = LOCK_TYPE_REQUIRES_REFRESH,
187 static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
188 .ast = ocfs2_rename_ast_func,
189 .bast = ocfs2_rename_bast_func,
190 .unlock_ast = ocfs2_unlock_ast_func,
191 .unblock = ocfs2_unblock_osb_lock,
195 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
196 .ast = ocfs2_dentry_ast_func,
197 .bast = ocfs2_dentry_bast_func,
198 .unlock_ast = ocfs2_unlock_ast_func,
199 .unblock = ocfs2_unblock_dentry_lock,
200 .post_unlock = ocfs2_dentry_post_unlock,
204 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
206 return lockres->l_type == OCFS2_LOCK_TYPE_META ||
207 lockres->l_type == OCFS2_LOCK_TYPE_DATA ||
208 lockres->l_type == OCFS2_LOCK_TYPE_RW;
211 static inline int ocfs2_is_super_lock(struct ocfs2_lock_res *lockres)
213 return lockres->l_type == OCFS2_LOCK_TYPE_SUPER;
216 static inline int ocfs2_is_rename_lock(struct ocfs2_lock_res *lockres)
218 return lockres->l_type == OCFS2_LOCK_TYPE_RENAME;
221 static inline struct ocfs2_super *ocfs2_lock_res_super(struct ocfs2_lock_res *lockres)
223 BUG_ON(!ocfs2_is_super_lock(lockres)
224 && !ocfs2_is_rename_lock(lockres));
226 return (struct ocfs2_super *) lockres->l_priv;
229 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
231 BUG_ON(!ocfs2_is_inode_lock(lockres));
233 return (struct inode *) lockres->l_priv;
236 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
238 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
240 return (struct ocfs2_dentry_lock *)lockres->l_priv;
243 static int ocfs2_lock_create(struct ocfs2_super *osb,
244 struct ocfs2_lock_res *lockres,
247 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
249 static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
250 struct ocfs2_lock_res *lockres,
252 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
253 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
254 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
255 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
256 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
257 struct ocfs2_lock_res *lockres);
258 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
260 #define ocfs2_log_dlm_error(_func, _stat, _lockres) do { \
261 mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on " \
262 "resource %s: %s\n", dlm_errname(_stat), _func, \
263 _lockres->l_name, dlm_errmsg(_stat)); \
265 static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
266 struct ocfs2_lock_res *lockres);
267 static int ocfs2_meta_lock_update(struct inode *inode,
268 struct buffer_head **bh);
269 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
270 static inline int ocfs2_highest_compat_lock_level(int level);
271 static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode,
272 struct ocfs2_lock_res *lockres,
275 static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
284 BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
286 len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
287 ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
288 (long long)blkno, generation);
290 BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
292 mlog(0, "built lock resource with name: %s\n", name);
297 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
299 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
300 struct ocfs2_dlm_debug *dlm_debug)
302 mlog(0, "Add tracking for lockres %s\n", res->l_name);
304 spin_lock(&ocfs2_dlm_tracking_lock);
305 list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
306 spin_unlock(&ocfs2_dlm_tracking_lock);
309 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
311 spin_lock(&ocfs2_dlm_tracking_lock);
312 if (!list_empty(&res->l_debug_list))
313 list_del_init(&res->l_debug_list);
314 spin_unlock(&ocfs2_dlm_tracking_lock);
317 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
318 struct ocfs2_lock_res *res,
319 enum ocfs2_lock_type type,
320 struct ocfs2_lock_res_ops *ops,
327 res->l_level = LKM_IVMODE;
328 res->l_requested = LKM_IVMODE;
329 res->l_blocking = LKM_IVMODE;
330 res->l_action = OCFS2_AST_INVALID;
331 res->l_unlock_action = OCFS2_UNLOCK_INVALID;
333 res->l_flags = OCFS2_LOCK_INITIALIZED;
335 ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
338 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
340 /* This also clears out the lock status block */
341 memset(res, 0, sizeof(struct ocfs2_lock_res));
342 spin_lock_init(&res->l_lock);
343 init_waitqueue_head(&res->l_event);
344 INIT_LIST_HEAD(&res->l_blocked_list);
345 INIT_LIST_HEAD(&res->l_mask_waiters);
348 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
349 enum ocfs2_lock_type type,
350 unsigned int generation,
353 struct ocfs2_lock_res_ops *ops;
356 case OCFS2_LOCK_TYPE_RW:
357 ops = &ocfs2_inode_rw_lops;
359 case OCFS2_LOCK_TYPE_META:
360 ops = &ocfs2_inode_meta_lops;
362 case OCFS2_LOCK_TYPE_DATA:
363 ops = &ocfs2_inode_data_lops;
366 mlog_bug_on_msg(1, "type: %d\n", type);
367 ops = NULL; /* thanks, gcc */
371 ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
372 generation, res->l_name);
373 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
376 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
378 __be64 inode_blkno_be;
380 memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
383 return be64_to_cpu(inode_blkno_be);
386 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
387 u64 parent, struct inode *inode)
390 u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
391 __be64 inode_blkno_be = cpu_to_be64(inode_blkno);
392 struct ocfs2_lock_res *lockres = &dl->dl_lockres;
394 ocfs2_lock_res_init_once(lockres);
397 * Unfortunately, the standard lock naming scheme won't work
398 * here because we have two 16 byte values to use. Instead,
399 * we'll stuff the inode number as a binary value. We still
400 * want error prints to show something without garbling the
401 * display, so drop a null byte in there before the inode
402 * number. A future version of OCFS2 will likely use all
403 * binary lock names. The stringified names have been a
404 * tremendous aid in debugging, but now that the debugfs
405 * interface exists, we can mangle things there if need be.
407 * NOTE: We also drop the standard "pad" value (the total lock
408 * name size stays the same though - the last part is all
409 * zeros due to the memset in ocfs2_lock_res_init_once()
411 len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
413 ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
416 BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
418 memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
421 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
422 OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
426 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
427 struct ocfs2_super *osb)
429 /* Superblock lockres doesn't come from a slab so we call init
430 * once on it manually. */
431 ocfs2_lock_res_init_once(res);
432 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
434 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
435 &ocfs2_super_lops, osb);
438 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
439 struct ocfs2_super *osb)
441 /* Rename lockres doesn't come from a slab so we call init
442 * once on it manually. */
443 ocfs2_lock_res_init_once(res);
444 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
445 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
446 &ocfs2_rename_lops, osb);
449 void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
453 if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
456 ocfs2_remove_lockres_tracking(res);
458 mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
459 "Lockres %s is on the blocked list\n",
461 mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
462 "Lockres %s has mask waiters pending\n",
464 mlog_bug_on_msg(spin_is_locked(&res->l_lock),
465 "Lockres %s is locked\n",
467 mlog_bug_on_msg(res->l_ro_holders,
468 "Lockres %s has %u ro holders\n",
469 res->l_name, res->l_ro_holders);
470 mlog_bug_on_msg(res->l_ex_holders,
471 "Lockres %s has %u ex holders\n",
472 res->l_name, res->l_ex_holders);
474 /* Need to clear out the lock status block for the dlm */
475 memset(&res->l_lksb, 0, sizeof(res->l_lksb));
481 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
490 lockres->l_ex_holders++;
493 lockres->l_ro_holders++;
502 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
511 BUG_ON(!lockres->l_ex_holders);
512 lockres->l_ex_holders--;
515 BUG_ON(!lockres->l_ro_holders);
516 lockres->l_ro_holders--;
524 /* WARNING: This function lives in a world where the only three lock
525 * levels are EX, PR, and NL. It *will* have to be adjusted when more
526 * lock types are added. */
527 static inline int ocfs2_highest_compat_lock_level(int level)
529 int new_level = LKM_EXMODE;
531 if (level == LKM_EXMODE)
532 new_level = LKM_NLMODE;
533 else if (level == LKM_PRMODE)
534 new_level = LKM_PRMODE;
538 static void lockres_set_flags(struct ocfs2_lock_res *lockres,
539 unsigned long newflags)
541 struct list_head *pos, *tmp;
542 struct ocfs2_mask_waiter *mw;
544 assert_spin_locked(&lockres->l_lock);
546 lockres->l_flags = newflags;
548 list_for_each_safe(pos, tmp, &lockres->l_mask_waiters) {
549 mw = list_entry(pos, struct ocfs2_mask_waiter, mw_item);
550 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
553 list_del_init(&mw->mw_item);
555 complete(&mw->mw_complete);
558 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
560 lockres_set_flags(lockres, lockres->l_flags | or);
562 static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
565 lockres_set_flags(lockres, lockres->l_flags & ~clear);
568 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
572 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
573 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
574 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
575 BUG_ON(lockres->l_blocking <= LKM_NLMODE);
577 lockres->l_level = lockres->l_requested;
578 if (lockres->l_level <=
579 ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
580 lockres->l_blocking = LKM_NLMODE;
581 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
583 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
588 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
592 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
593 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
595 /* Convert from RO to EX doesn't really need anything as our
596 * information is already up to data. Convert from NL to
597 * *anything* however should mark ourselves as needing an
599 if (lockres->l_level == LKM_NLMODE &&
600 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
601 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
603 lockres->l_level = lockres->l_requested;
604 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
609 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
613 BUG_ON((!lockres->l_flags & OCFS2_LOCK_BUSY));
614 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
616 if (lockres->l_requested > LKM_NLMODE &&
617 !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
618 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
619 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
621 lockres->l_level = lockres->l_requested;
622 lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
623 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
628 static void ocfs2_inode_ast_func(void *opaque)
630 struct ocfs2_lock_res *lockres = opaque;
632 struct dlm_lockstatus *lksb;
637 inode = ocfs2_lock_res_inode(lockres);
639 mlog(0, "AST fired for inode %llu, l_action = %u, type = %s\n",
640 (unsigned long long)OCFS2_I(inode)->ip_blkno, lockres->l_action,
641 ocfs2_lock_type_string(lockres->l_type));
643 BUG_ON(!ocfs2_is_inode_lock(lockres));
645 spin_lock_irqsave(&lockres->l_lock, flags);
647 lksb = &(lockres->l_lksb);
648 if (lksb->status != DLM_NORMAL) {
649 mlog(ML_ERROR, "ocfs2_inode_ast_func: lksb status value of %u "
650 "on inode %llu\n", lksb->status,
651 (unsigned long long)OCFS2_I(inode)->ip_blkno);
652 spin_unlock_irqrestore(&lockres->l_lock, flags);
657 switch(lockres->l_action) {
658 case OCFS2_AST_ATTACH:
659 ocfs2_generic_handle_attach_action(lockres);
660 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
662 case OCFS2_AST_CONVERT:
663 ocfs2_generic_handle_convert_action(lockres);
665 case OCFS2_AST_DOWNCONVERT:
666 ocfs2_generic_handle_downconvert_action(lockres);
669 mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
670 "lockres flags = 0x%lx, unlock action: %u\n",
671 lockres->l_name, lockres->l_action, lockres->l_flags,
672 lockres->l_unlock_action);
677 /* set it to something invalid so if we get called again we
679 lockres->l_action = OCFS2_AST_INVALID;
680 spin_unlock_irqrestore(&lockres->l_lock, flags);
681 wake_up(&lockres->l_event);
686 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
689 int needs_downconvert = 0;
692 assert_spin_locked(&lockres->l_lock);
694 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
696 if (level > lockres->l_blocking) {
697 /* only schedule a downconvert if we haven't already scheduled
698 * one that goes low enough to satisfy the level we're
699 * blocking. this also catches the case where we get
701 if (ocfs2_highest_compat_lock_level(level) <
702 ocfs2_highest_compat_lock_level(lockres->l_blocking))
703 needs_downconvert = 1;
705 lockres->l_blocking = level;
708 mlog_exit(needs_downconvert);
709 return needs_downconvert;
712 static void ocfs2_generic_bast_func(struct ocfs2_super *osb,
713 struct ocfs2_lock_res *lockres,
716 int needs_downconvert;
721 BUG_ON(level <= LKM_NLMODE);
723 spin_lock_irqsave(&lockres->l_lock, flags);
724 needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
725 if (needs_downconvert)
726 ocfs2_schedule_blocked_lock(osb, lockres);
727 spin_unlock_irqrestore(&lockres->l_lock, flags);
729 wake_up(&lockres->l_event);
731 ocfs2_kick_vote_thread(osb);
736 static void ocfs2_inode_bast_func(void *opaque, int level)
738 struct ocfs2_lock_res *lockres = opaque;
740 struct ocfs2_super *osb;
744 BUG_ON(!ocfs2_is_inode_lock(lockres));
746 inode = ocfs2_lock_res_inode(lockres);
747 osb = OCFS2_SB(inode->i_sb);
749 mlog(0, "BAST fired for inode %llu, blocking %d, level %d type %s\n",
750 (unsigned long long)OCFS2_I(inode)->ip_blkno, level,
751 lockres->l_level, ocfs2_lock_type_string(lockres->l_type));
753 ocfs2_generic_bast_func(osb, lockres, level);
758 static void ocfs2_generic_ast_func(struct ocfs2_lock_res *lockres)
760 struct dlm_lockstatus *lksb = &lockres->l_lksb;
763 spin_lock_irqsave(&lockres->l_lock, flags);
765 if (lksb->status != DLM_NORMAL) {
766 mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n",
767 lockres->l_name, lksb->status);
768 spin_unlock_irqrestore(&lockres->l_lock, flags);
772 switch(lockres->l_action) {
773 case OCFS2_AST_ATTACH:
774 ocfs2_generic_handle_attach_action(lockres);
776 case OCFS2_AST_CONVERT:
777 ocfs2_generic_handle_convert_action(lockres);
779 case OCFS2_AST_DOWNCONVERT:
780 ocfs2_generic_handle_downconvert_action(lockres);
786 /* set it to something invalid so if we get called again we
788 lockres->l_action = OCFS2_AST_INVALID;
790 wake_up(&lockres->l_event);
791 spin_unlock_irqrestore(&lockres->l_lock, flags);
794 static void ocfs2_super_ast_func(void *opaque)
796 struct ocfs2_lock_res *lockres = opaque;
799 mlog(0, "Superblock AST fired\n");
801 BUG_ON(!ocfs2_is_super_lock(lockres));
802 ocfs2_generic_ast_func(lockres);
807 static void ocfs2_super_bast_func(void *opaque,
810 struct ocfs2_lock_res *lockres = opaque;
811 struct ocfs2_super *osb;
814 mlog(0, "Superblock BAST fired\n");
816 BUG_ON(!ocfs2_is_super_lock(lockres));
817 osb = ocfs2_lock_res_super(lockres);
818 ocfs2_generic_bast_func(osb, lockres, level);
823 static void ocfs2_rename_ast_func(void *opaque)
825 struct ocfs2_lock_res *lockres = opaque;
829 mlog(0, "Rename AST fired\n");
831 BUG_ON(!ocfs2_is_rename_lock(lockres));
833 ocfs2_generic_ast_func(lockres);
838 static void ocfs2_rename_bast_func(void *opaque,
841 struct ocfs2_lock_res *lockres = opaque;
842 struct ocfs2_super *osb;
846 mlog(0, "Rename BAST fired\n");
848 BUG_ON(!ocfs2_is_rename_lock(lockres));
850 osb = ocfs2_lock_res_super(lockres);
851 ocfs2_generic_bast_func(osb, lockres, level);
856 static void ocfs2_dentry_ast_func(void *opaque)
858 struct ocfs2_lock_res *lockres = opaque;
862 ocfs2_generic_ast_func(lockres);
865 static void ocfs2_dentry_bast_func(void *opaque, int level)
867 struct ocfs2_lock_res *lockres = opaque;
868 struct ocfs2_dentry_lock *dl = lockres->l_priv;
869 struct ocfs2_super *osb = OCFS2_SB(dl->dl_inode->i_sb);
871 mlog(0, "Dentry bast: level: %d, name: %s\n", level,
874 ocfs2_generic_bast_func(osb, lockres, level);
877 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
883 spin_lock_irqsave(&lockres->l_lock, flags);
884 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
886 lockres->l_action = OCFS2_AST_INVALID;
888 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
889 spin_unlock_irqrestore(&lockres->l_lock, flags);
891 wake_up(&lockres->l_event);
895 /* Note: If we detect another process working on the lock (i.e.,
896 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
897 * to do the right thing in that case.
899 static int ocfs2_lock_create(struct ocfs2_super *osb,
900 struct ocfs2_lock_res *lockres,
905 enum dlm_status status;
910 mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level,
913 spin_lock_irqsave(&lockres->l_lock, flags);
914 if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
915 (lockres->l_flags & OCFS2_LOCK_BUSY)) {
916 spin_unlock_irqrestore(&lockres->l_lock, flags);
920 lockres->l_action = OCFS2_AST_ATTACH;
921 lockres->l_requested = level;
922 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
923 spin_unlock_irqrestore(&lockres->l_lock, flags);
925 status = dlmlock(osb->dlm,
930 OCFS2_LOCK_ID_MAX_LEN - 1,
933 lockres->l_ops->bast);
934 if (status != DLM_NORMAL) {
935 ocfs2_log_dlm_error("dlmlock", status, lockres);
937 ocfs2_recover_from_dlm_error(lockres, 1);
940 mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name);
947 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
953 spin_lock_irqsave(&lockres->l_lock, flags);
954 ret = lockres->l_flags & flag;
955 spin_unlock_irqrestore(&lockres->l_lock, flags);
960 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
963 wait_event(lockres->l_event,
964 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
967 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
970 wait_event(lockres->l_event,
971 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
974 /* predict what lock level we'll be dropping down to on behalf
975 * of another node, and return true if the currently wanted
976 * level will be compatible with it. */
977 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
980 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
982 return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
985 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
987 INIT_LIST_HEAD(&mw->mw_item);
988 init_completion(&mw->mw_complete);
991 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
993 wait_for_completion(&mw->mw_complete);
994 /* Re-arm the completion in case we want to wait on it again */
995 INIT_COMPLETION(mw->mw_complete);
996 return mw->mw_status;
999 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
1000 struct ocfs2_mask_waiter *mw,
1004 BUG_ON(!list_empty(&mw->mw_item));
1006 assert_spin_locked(&lockres->l_lock);
1008 list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
1013 /* returns 0 if the mw that was removed was already satisfied, -EBUSY
1014 * if the mask still hadn't reached its goal */
1015 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
1016 struct ocfs2_mask_waiter *mw)
1018 unsigned long flags;
1021 spin_lock_irqsave(&lockres->l_lock, flags);
1022 if (!list_empty(&mw->mw_item)) {
1023 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
1026 list_del_init(&mw->mw_item);
1027 init_completion(&mw->mw_complete);
1029 spin_unlock_irqrestore(&lockres->l_lock, flags);
1035 static int ocfs2_cluster_lock(struct ocfs2_super *osb,
1036 struct ocfs2_lock_res *lockres,
1041 struct ocfs2_mask_waiter mw;
1042 enum dlm_status status;
1043 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
1044 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
1045 unsigned long flags;
1049 ocfs2_init_mask_waiter(&mw);
1054 if (catch_signals && signal_pending(current)) {
1059 spin_lock_irqsave(&lockres->l_lock, flags);
1061 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
1062 "Cluster lock called on freeing lockres %s! flags "
1063 "0x%lx\n", lockres->l_name, lockres->l_flags);
1065 /* We only compare against the currently granted level
1066 * here. If the lock is blocked waiting on a downconvert,
1067 * we'll get caught below. */
1068 if (lockres->l_flags & OCFS2_LOCK_BUSY &&
1069 level > lockres->l_level) {
1070 /* is someone sitting in dlm_lock? If so, wait on
1072 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1077 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1078 /* lock has not been created yet. */
1079 spin_unlock_irqrestore(&lockres->l_lock, flags);
1081 ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0);
1089 if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
1090 !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
1091 /* is the lock is currently blocked on behalf of
1093 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
1098 if (level > lockres->l_level) {
1099 if (lockres->l_action != OCFS2_AST_INVALID)
1100 mlog(ML_ERROR, "lockres %s has action %u pending\n",
1101 lockres->l_name, lockres->l_action);
1103 lockres->l_action = OCFS2_AST_CONVERT;
1104 lockres->l_requested = level;
1105 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1106 spin_unlock_irqrestore(&lockres->l_lock, flags);
1108 BUG_ON(level == LKM_IVMODE);
1109 BUG_ON(level == LKM_NLMODE);
1111 mlog(0, "lock %s, convert from %d to level = %d\n",
1112 lockres->l_name, lockres->l_level, level);
1114 /* call dlm_lock to upgrade lock now */
1115 status = dlmlock(osb->dlm,
1118 lkm_flags|LKM_CONVERT|LKM_VALBLK,
1120 OCFS2_LOCK_ID_MAX_LEN - 1,
1121 lockres->l_ops->ast,
1123 lockres->l_ops->bast);
1124 if (status != DLM_NORMAL) {
1125 if ((lkm_flags & LKM_NOQUEUE) &&
1126 (status == DLM_NOTQUEUED))
1129 ocfs2_log_dlm_error("dlmlock", status,
1133 ocfs2_recover_from_dlm_error(lockres, 1);
1137 mlog(0, "lock %s, successfull return from dlmlock\n",
1140 /* At this point we've gone inside the dlm and need to
1141 * complete our work regardless. */
1144 /* wait for busy to clear and carry on */
1148 /* Ok, if we get here then we're good to go. */
1149 ocfs2_inc_holders(lockres, level);
1153 spin_unlock_irqrestore(&lockres->l_lock, flags);
1156 * This is helping work around a lock inversion between the page lock
1157 * and dlm locks. One path holds the page lock while calling aops
1158 * which block acquiring dlm locks. The voting thread holds dlm
1159 * locks while acquiring page locks while down converting data locks.
1160 * This block is helping an aop path notice the inversion and back
1161 * off to unlock its page lock before trying the dlm lock again.
1163 if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1164 mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1166 if (lockres_remove_mask_waiter(lockres, &mw))
1172 ret = ocfs2_wait_for_mask(&mw);
1182 static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
1183 struct ocfs2_lock_res *lockres,
1186 unsigned long flags;
1189 spin_lock_irqsave(&lockres->l_lock, flags);
1190 ocfs2_dec_holders(lockres, level);
1191 ocfs2_vote_on_unlock(osb, lockres);
1192 spin_unlock_irqrestore(&lockres->l_lock, flags);
1196 int ocfs2_create_new_lock(struct ocfs2_super *osb,
1197 struct ocfs2_lock_res *lockres,
1201 int level = ex ? LKM_EXMODE : LKM_PRMODE;
1202 unsigned long flags;
1203 int lkm_flags = local ? LKM_LOCAL : 0;
1205 spin_lock_irqsave(&lockres->l_lock, flags);
1206 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1207 lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1208 spin_unlock_irqrestore(&lockres->l_lock, flags);
1210 return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1213 /* Grants us an EX lock on the data and metadata resources, skipping
1214 * the normal cluster directory lookup. Use this ONLY on newly created
1215 * inodes which other nodes can't possibly see, and which haven't been
1216 * hashed in the inode hash yet. This can give us a good performance
1217 * increase as it'll skip the network broadcast normally associated
1218 * with creating a new lock resource. */
1219 int ocfs2_create_new_inode_locks(struct inode *inode)
1222 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1225 BUG_ON(!ocfs2_inode_is_new(inode));
1229 mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1231 /* NOTE: That we don't increment any of the holder counts, nor
1232 * do we add anything to a journal handle. Since this is
1233 * supposed to be a new inode which the cluster doesn't know
1234 * about yet, there is no need to. As far as the LVB handling
1235 * is concerned, this is basically like acquiring an EX lock
1236 * on a resource which has an invalid one -- we'll set it
1237 * valid when we release the EX. */
1239 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1246 * We don't want to use LKM_LOCAL on a meta data lock as they
1247 * don't use a generation in their lock names.
1249 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_meta_lockres, 1, 0);
1255 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_data_lockres, 1, 1);
1266 int ocfs2_rw_lock(struct inode *inode, int write)
1269 struct ocfs2_lock_res *lockres;
1275 mlog(0, "inode %llu take %s RW lock\n",
1276 (unsigned long long)OCFS2_I(inode)->ip_blkno,
1277 write ? "EXMODE" : "PRMODE");
1279 lockres = &OCFS2_I(inode)->ip_rw_lockres;
1281 level = write ? LKM_EXMODE : LKM_PRMODE;
1283 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1292 void ocfs2_rw_unlock(struct inode *inode, int write)
1294 int level = write ? LKM_EXMODE : LKM_PRMODE;
1295 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1299 mlog(0, "inode %llu drop %s RW lock\n",
1300 (unsigned long long)OCFS2_I(inode)->ip_blkno,
1301 write ? "EXMODE" : "PRMODE");
1303 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1308 int ocfs2_data_lock_full(struct inode *inode,
1312 int status = 0, level;
1313 struct ocfs2_lock_res *lockres;
1319 mlog(0, "inode %llu take %s DATA lock\n",
1320 (unsigned long long)OCFS2_I(inode)->ip_blkno,
1321 write ? "EXMODE" : "PRMODE");
1323 /* We'll allow faking a readonly data lock for
1325 if (ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb))) {
1333 lockres = &OCFS2_I(inode)->ip_data_lockres;
1335 level = write ? LKM_EXMODE : LKM_PRMODE;
1337 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level,
1339 if (status < 0 && status != -EAGAIN)
1347 /* see ocfs2_meta_lock_with_page() */
1348 int ocfs2_data_lock_with_page(struct inode *inode,
1354 ret = ocfs2_data_lock_full(inode, write, OCFS2_LOCK_NONBLOCK);
1355 if (ret == -EAGAIN) {
1357 if (ocfs2_data_lock(inode, write) == 0)
1358 ocfs2_data_unlock(inode, write);
1359 ret = AOP_TRUNCATED_PAGE;
1365 static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
1366 struct ocfs2_lock_res *lockres)
1372 /* If we know that another node is waiting on our lock, kick
1373 * the vote thread * pre-emptively when we reach a release
1375 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
1376 switch(lockres->l_blocking) {
1378 if (!lockres->l_ex_holders && !lockres->l_ro_holders)
1382 if (!lockres->l_ex_holders)
1391 ocfs2_kick_vote_thread(osb);
1396 void ocfs2_data_unlock(struct inode *inode,
1399 int level = write ? LKM_EXMODE : LKM_PRMODE;
1400 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_data_lockres;
1404 mlog(0, "inode %llu drop %s DATA lock\n",
1405 (unsigned long long)OCFS2_I(inode)->ip_blkno,
1406 write ? "EXMODE" : "PRMODE");
1408 if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)))
1409 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1414 #define OCFS2_SEC_BITS 34
1415 #define OCFS2_SEC_SHIFT (64 - 34)
1416 #define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1)
1418 /* LVB only has room for 64 bits of time here so we pack it for
1420 static u64 ocfs2_pack_timespec(struct timespec *spec)
1423 u64 sec = spec->tv_sec;
1424 u32 nsec = spec->tv_nsec;
1426 res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
1431 /* Call this with the lockres locked. I am reasonably sure we don't
1432 * need ip_lock in this function as anyone who would be changing those
1433 * values is supposed to be blocked in ocfs2_meta_lock right now. */
1434 static void __ocfs2_stuff_meta_lvb(struct inode *inode)
1436 struct ocfs2_inode_info *oi = OCFS2_I(inode);
1437 struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1438 struct ocfs2_meta_lvb *lvb;
1442 lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1445 * Invalidate the LVB of a deleted inode - this way other
1446 * nodes are forced to go to disk and discover the new inode
1449 if (oi->ip_flags & OCFS2_INODE_DELETED) {
1450 lvb->lvb_version = 0;
1454 lvb->lvb_version = OCFS2_LVB_VERSION;
1455 lvb->lvb_isize = cpu_to_be64(i_size_read(inode));
1456 lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
1457 lvb->lvb_iuid = cpu_to_be32(inode->i_uid);
1458 lvb->lvb_igid = cpu_to_be32(inode->i_gid);
1459 lvb->lvb_imode = cpu_to_be16(inode->i_mode);
1460 lvb->lvb_inlink = cpu_to_be16(inode->i_nlink);
1461 lvb->lvb_iatime_packed =
1462 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
1463 lvb->lvb_ictime_packed =
1464 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
1465 lvb->lvb_imtime_packed =
1466 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
1467 lvb->lvb_iattr = cpu_to_be32(oi->ip_attr);
1468 lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
1471 mlog_meta_lvb(0, lockres);
1476 static void ocfs2_unpack_timespec(struct timespec *spec,
1479 spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
1480 spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
1483 static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
1485 struct ocfs2_inode_info *oi = OCFS2_I(inode);
1486 struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1487 struct ocfs2_meta_lvb *lvb;
1491 mlog_meta_lvb(0, lockres);
1493 lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1495 /* We're safe here without the lockres lock... */
1496 spin_lock(&oi->ip_lock);
1497 oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
1498 i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
1500 oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
1501 ocfs2_set_inode_flags(inode);
1503 /* fast-symlinks are a special case */
1504 if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
1505 inode->i_blocks = 0;
1508 ocfs2_align_bytes_to_sectors(i_size_read(inode));
1510 inode->i_uid = be32_to_cpu(lvb->lvb_iuid);
1511 inode->i_gid = be32_to_cpu(lvb->lvb_igid);
1512 inode->i_mode = be16_to_cpu(lvb->lvb_imode);
1513 inode->i_nlink = be16_to_cpu(lvb->lvb_inlink);
1514 ocfs2_unpack_timespec(&inode->i_atime,
1515 be64_to_cpu(lvb->lvb_iatime_packed));
1516 ocfs2_unpack_timespec(&inode->i_mtime,
1517 be64_to_cpu(lvb->lvb_imtime_packed));
1518 ocfs2_unpack_timespec(&inode->i_ctime,
1519 be64_to_cpu(lvb->lvb_ictime_packed));
1520 spin_unlock(&oi->ip_lock);
1525 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
1526 struct ocfs2_lock_res *lockres)
1528 struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1530 if (lvb->lvb_version == OCFS2_LVB_VERSION
1531 && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
1536 /* Determine whether a lock resource needs to be refreshed, and
1537 * arbitrate who gets to refresh it.
1539 * 0 means no refresh needed.
1541 * > 0 means you need to refresh this and you MUST call
1542 * ocfs2_complete_lock_res_refresh afterwards. */
1543 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
1545 unsigned long flags;
1551 spin_lock_irqsave(&lockres->l_lock, flags);
1552 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
1553 spin_unlock_irqrestore(&lockres->l_lock, flags);
1557 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
1558 spin_unlock_irqrestore(&lockres->l_lock, flags);
1560 ocfs2_wait_on_refreshing_lock(lockres);
1564 /* Ok, I'll be the one to refresh this lock. */
1565 lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
1566 spin_unlock_irqrestore(&lockres->l_lock, flags);
1574 /* If status is non zero, I'll mark it as not being in refresh
1575 * anymroe, but i won't clear the needs refresh flag. */
1576 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
1579 unsigned long flags;
1582 spin_lock_irqsave(&lockres->l_lock, flags);
1583 lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
1585 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
1586 spin_unlock_irqrestore(&lockres->l_lock, flags);
1588 wake_up(&lockres->l_event);
1593 /* may or may not return a bh if it went to disk. */
1594 static int ocfs2_meta_lock_update(struct inode *inode,
1595 struct buffer_head **bh)
1598 struct ocfs2_inode_info *oi = OCFS2_I(inode);
1599 struct ocfs2_lock_res *lockres;
1600 struct ocfs2_dinode *fe;
1604 spin_lock(&oi->ip_lock);
1605 if (oi->ip_flags & OCFS2_INODE_DELETED) {
1606 mlog(0, "Orphaned inode %llu was deleted while we "
1607 "were waiting on a lock. ip_flags = 0x%x\n",
1608 (unsigned long long)oi->ip_blkno, oi->ip_flags);
1609 spin_unlock(&oi->ip_lock);
1613 spin_unlock(&oi->ip_lock);
1615 lockres = &oi->ip_meta_lockres;
1617 if (!ocfs2_should_refresh_lock_res(lockres))
1620 /* This will discard any caching information we might have had
1621 * for the inode metadata. */
1622 ocfs2_metadata_cache_purge(inode);
1624 /* will do nothing for inode types that don't use the extent
1625 * map (directories, bitmap files, etc) */
1626 ocfs2_extent_map_trunc(inode, 0);
1628 if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
1629 mlog(0, "Trusting LVB on inode %llu\n",
1630 (unsigned long long)oi->ip_blkno);
1631 ocfs2_refresh_inode_from_lvb(inode);
1633 /* Boo, we have to go to disk. */
1634 /* read bh, cast, ocfs2_refresh_inode */
1635 status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno,
1636 bh, OCFS2_BH_CACHED, inode);
1641 fe = (struct ocfs2_dinode *) (*bh)->b_data;
1643 /* This is a good chance to make sure we're not
1644 * locking an invalid object.
1646 * We bug on a stale inode here because we checked
1647 * above whether it was wiped from disk. The wiping
1648 * node provides a guarantee that we receive that
1649 * message and can mark the inode before dropping any
1650 * locks associated with it. */
1651 if (!OCFS2_IS_VALID_DINODE(fe)) {
1652 OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe);
1656 mlog_bug_on_msg(inode->i_generation !=
1657 le32_to_cpu(fe->i_generation),
1658 "Invalid dinode %llu disk generation: %u "
1659 "inode->i_generation: %u\n",
1660 (unsigned long long)oi->ip_blkno,
1661 le32_to_cpu(fe->i_generation),
1662 inode->i_generation);
1663 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
1664 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
1665 "Stale dinode %llu dtime: %llu flags: 0x%x\n",
1666 (unsigned long long)oi->ip_blkno,
1667 (unsigned long long)le64_to_cpu(fe->i_dtime),
1668 le32_to_cpu(fe->i_flags));
1670 ocfs2_refresh_inode(inode, fe);
1675 ocfs2_complete_lock_res_refresh(lockres, status);
1681 static int ocfs2_assign_bh(struct inode *inode,
1682 struct buffer_head **ret_bh,
1683 struct buffer_head *passed_bh)
1688 /* Ok, the update went to disk for us, use the
1690 *ret_bh = passed_bh;
1696 status = ocfs2_read_block(OCFS2_SB(inode->i_sb),
1697 OCFS2_I(inode)->ip_blkno,
1708 * returns < 0 error if the callback will never be called, otherwise
1709 * the result of the lock will be communicated via the callback.
1711 int ocfs2_meta_lock_full(struct inode *inode,
1712 struct ocfs2_journal_handle *handle,
1713 struct buffer_head **ret_bh,
1717 int status, level, dlm_flags, acquired;
1718 struct ocfs2_lock_res *lockres;
1719 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1720 struct buffer_head *local_bh = NULL;
1726 mlog(0, "inode %llu, take %s META lock\n",
1727 (unsigned long long)OCFS2_I(inode)->ip_blkno,
1728 ex ? "EXMODE" : "PRMODE");
1732 /* We'll allow faking a readonly metadata lock for
1734 if (ocfs2_is_hard_readonly(osb)) {
1740 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1741 wait_event(osb->recovery_event,
1742 ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1745 lockres = &OCFS2_I(inode)->ip_meta_lockres;
1746 level = ex ? LKM_EXMODE : LKM_PRMODE;
1748 if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
1749 dlm_flags |= LKM_NOQUEUE;
1751 status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
1753 if (status != -EAGAIN && status != -EIOCBRETRY)
1758 /* Notify the error cleanup path to drop the cluster lock. */
1761 /* We wait twice because a node may have died while we were in
1762 * the lower dlm layers. The second time though, we've
1763 * committed to owning this lock so we don't allow signals to
1764 * abort the operation. */
1765 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1766 wait_event(osb->recovery_event,
1767 ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1770 * We only see this flag if we're being called from
1771 * ocfs2_read_locked_inode(). It means we're locking an inode
1772 * which hasn't been populated yet, so clear the refresh flag
1773 * and let the caller handle it.
1775 if (inode->i_state & I_NEW) {
1777 ocfs2_complete_lock_res_refresh(lockres, 0);
1781 /* This is fun. The caller may want a bh back, or it may
1782 * not. ocfs2_meta_lock_update definitely wants one in, but
1783 * may or may not read one, depending on what's in the
1784 * LVB. The result of all of this is that we've *only* gone to
1785 * disk if we have to, so the complexity is worthwhile. */
1786 status = ocfs2_meta_lock_update(inode, &local_bh);
1788 if (status != -ENOENT)
1794 status = ocfs2_assign_bh(inode, ret_bh, local_bh);
1802 status = ocfs2_handle_add_lock(handle, inode);
1809 if (ret_bh && (*ret_bh)) {
1814 ocfs2_meta_unlock(inode, ex);
1825 * This is working around a lock inversion between tasks acquiring DLM locks
1826 * while holding a page lock and the vote thread which blocks dlm lock acquiry
1827 * while acquiring page locks.
1829 * ** These _with_page variantes are only intended to be called from aop
1830 * methods that hold page locks and return a very specific *positive* error
1831 * code that aop methods pass up to the VFS -- test for errors with != 0. **
1833 * The DLM is called such that it returns -EAGAIN if it would have blocked
1834 * waiting for the vote thread. In that case we unlock our page so the vote
1835 * thread can make progress. Once we've done this we have to return
1836 * AOP_TRUNCATED_PAGE so the aop method that called us can bubble that back up
1837 * into the VFS who will then immediately retry the aop call.
1839 * We do a blocking lock and immediate unlock before returning, though, so that
1840 * the lock has a great chance of being cached on this node by the time the VFS
1841 * calls back to retry the aop. This has a potential to livelock as nodes
1842 * ping locks back and forth, but that's a risk we're willing to take to avoid
1843 * the lock inversion simply.
1845 int ocfs2_meta_lock_with_page(struct inode *inode,
1846 struct ocfs2_journal_handle *handle,
1847 struct buffer_head **ret_bh,
1853 ret = ocfs2_meta_lock_full(inode, handle, ret_bh, ex,
1854 OCFS2_LOCK_NONBLOCK);
1855 if (ret == -EAGAIN) {
1857 if (ocfs2_meta_lock(inode, handle, ret_bh, ex) == 0)
1858 ocfs2_meta_unlock(inode, ex);
1859 ret = AOP_TRUNCATED_PAGE;
1865 void ocfs2_meta_unlock(struct inode *inode,
1868 int level = ex ? LKM_EXMODE : LKM_PRMODE;
1869 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
1873 mlog(0, "inode %llu drop %s META lock\n",
1874 (unsigned long long)OCFS2_I(inode)->ip_blkno,
1875 ex ? "EXMODE" : "PRMODE");
1877 if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)))
1878 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1883 int ocfs2_super_lock(struct ocfs2_super *osb,
1887 int level = ex ? LKM_EXMODE : LKM_PRMODE;
1888 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1889 struct buffer_head *bh;
1890 struct ocfs2_slot_info *si = osb->slot_info;
1894 if (ocfs2_is_hard_readonly(osb))
1897 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
1903 /* The super block lock path is really in the best position to
1904 * know when resources covered by the lock need to be
1905 * refreshed, so we do it here. Of course, making sense of
1906 * everything is up to the caller :) */
1907 status = ocfs2_should_refresh_lock_res(lockres);
1914 status = ocfs2_read_block(osb, bh->b_blocknr, &bh, 0,
1917 ocfs2_update_slot_info(si);
1919 ocfs2_complete_lock_res_refresh(lockres, status);
1929 void ocfs2_super_unlock(struct ocfs2_super *osb,
1932 int level = ex ? LKM_EXMODE : LKM_PRMODE;
1933 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1935 ocfs2_cluster_unlock(osb, lockres, level);
1938 int ocfs2_rename_lock(struct ocfs2_super *osb)
1941 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1943 if (ocfs2_is_hard_readonly(osb))
1946 status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0);
1953 void ocfs2_rename_unlock(struct ocfs2_super *osb)
1955 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1957 ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
1960 int ocfs2_dentry_lock(struct dentry *dentry, int ex)
1963 int level = ex ? LKM_EXMODE : LKM_PRMODE;
1964 struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
1965 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
1969 if (ocfs2_is_hard_readonly(osb))
1972 ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
1979 void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
1981 int level = ex ? LKM_EXMODE : LKM_PRMODE;
1982 struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
1983 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
1985 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
1988 /* Reference counting of the dlm debug structure. We want this because
1989 * open references on the debug inodes can live on after a mount, so
1990 * we can't rely on the ocfs2_super to always exist. */
1991 static void ocfs2_dlm_debug_free(struct kref *kref)
1993 struct ocfs2_dlm_debug *dlm_debug;
1995 dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
2000 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
2003 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
2006 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
2008 kref_get(&debug->d_refcnt);
2011 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
2013 struct ocfs2_dlm_debug *dlm_debug;
2015 dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
2017 mlog_errno(-ENOMEM);
2021 kref_init(&dlm_debug->d_refcnt);
2022 INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
2023 dlm_debug->d_locking_state = NULL;
2028 /* Access to this is arbitrated for us via seq_file->sem. */
2029 struct ocfs2_dlm_seq_priv {
2030 struct ocfs2_dlm_debug *p_dlm_debug;
2031 struct ocfs2_lock_res p_iter_res;
2032 struct ocfs2_lock_res p_tmp_res;
2035 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
2036 struct ocfs2_dlm_seq_priv *priv)
2038 struct ocfs2_lock_res *iter, *ret = NULL;
2039 struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
2041 assert_spin_locked(&ocfs2_dlm_tracking_lock);
2043 list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
2044 /* discover the head of the list */
2045 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
2046 mlog(0, "End of list found, %p\n", ret);
2050 /* We track our "dummy" iteration lockres' by a NULL
2052 if (iter->l_ops != NULL) {
2061 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
2063 struct ocfs2_dlm_seq_priv *priv = m->private;
2064 struct ocfs2_lock_res *iter;
2066 spin_lock(&ocfs2_dlm_tracking_lock);
2067 iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
2069 /* Since lockres' have the lifetime of their container
2070 * (which can be inodes, ocfs2_supers, etc) we want to
2071 * copy this out to a temporary lockres while still
2072 * under the spinlock. Obviously after this we can't
2073 * trust any pointers on the copy returned, but that's
2074 * ok as the information we want isn't typically held
2076 priv->p_tmp_res = *iter;
2077 iter = &priv->p_tmp_res;
2079 spin_unlock(&ocfs2_dlm_tracking_lock);
2084 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
2088 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
2090 struct ocfs2_dlm_seq_priv *priv = m->private;
2091 struct ocfs2_lock_res *iter = v;
2092 struct ocfs2_lock_res *dummy = &priv->p_iter_res;
2094 spin_lock(&ocfs2_dlm_tracking_lock);
2095 iter = ocfs2_dlm_next_res(iter, priv);
2096 list_del_init(&dummy->l_debug_list);
2098 list_add(&dummy->l_debug_list, &iter->l_debug_list);
2099 priv->p_tmp_res = *iter;
2100 iter = &priv->p_tmp_res;
2102 spin_unlock(&ocfs2_dlm_tracking_lock);
2107 /* So that debugfs.ocfs2 can determine which format is being used */
2108 #define OCFS2_DLM_DEBUG_STR_VERSION 1
2109 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
2113 struct ocfs2_lock_res *lockres = v;
2118 seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
2120 if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
2121 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
2123 (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
2125 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
2127 seq_printf(m, "%d\t"
2138 lockres->l_unlock_action,
2139 lockres->l_ro_holders,
2140 lockres->l_ex_holders,
2141 lockres->l_requested,
2142 lockres->l_blocking);
2144 /* Dump the raw LVB */
2145 lvb = lockres->l_lksb.lvb;
2146 for(i = 0; i < DLM_LVB_LEN; i++)
2147 seq_printf(m, "0x%x\t", lvb[i]);
2150 seq_printf(m, "\n");
2154 static struct seq_operations ocfs2_dlm_seq_ops = {
2155 .start = ocfs2_dlm_seq_start,
2156 .stop = ocfs2_dlm_seq_stop,
2157 .next = ocfs2_dlm_seq_next,
2158 .show = ocfs2_dlm_seq_show,
2161 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
2163 struct seq_file *seq = (struct seq_file *) file->private_data;
2164 struct ocfs2_dlm_seq_priv *priv = seq->private;
2165 struct ocfs2_lock_res *res = &priv->p_iter_res;
2167 ocfs2_remove_lockres_tracking(res);
2168 ocfs2_put_dlm_debug(priv->p_dlm_debug);
2169 return seq_release_private(inode, file);
2172 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
2175 struct ocfs2_dlm_seq_priv *priv;
2176 struct seq_file *seq;
2177 struct ocfs2_super *osb;
2179 priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL);
2185 osb = (struct ocfs2_super *) inode->u.generic_ip;
2186 ocfs2_get_dlm_debug(osb->osb_dlm_debug);
2187 priv->p_dlm_debug = osb->osb_dlm_debug;
2188 INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
2190 ret = seq_open(file, &ocfs2_dlm_seq_ops);
2197 seq = (struct seq_file *) file->private_data;
2198 seq->private = priv;
2200 ocfs2_add_lockres_tracking(&priv->p_iter_res,
2207 static const struct file_operations ocfs2_dlm_debug_fops = {
2208 .open = ocfs2_dlm_debug_open,
2209 .release = ocfs2_dlm_debug_release,
2211 .llseek = seq_lseek,
2214 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
2217 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2219 dlm_debug->d_locking_state = debugfs_create_file("locking_state",
2221 osb->osb_debug_root,
2223 &ocfs2_dlm_debug_fops);
2224 if (!dlm_debug->d_locking_state) {
2227 "Unable to create locking state debugfs file.\n");
2231 ocfs2_get_dlm_debug(dlm_debug);
2236 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
2238 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2241 debugfs_remove(dlm_debug->d_locking_state);
2242 ocfs2_put_dlm_debug(dlm_debug);
2246 int ocfs2_dlm_init(struct ocfs2_super *osb)
2250 struct dlm_ctxt *dlm;
2254 status = ocfs2_dlm_init_debug(osb);
2260 /* launch vote thread */
2261 osb->vote_task = kthread_run(ocfs2_vote_thread, osb, "ocfs2vote");
2262 if (IS_ERR(osb->vote_task)) {
2263 status = PTR_ERR(osb->vote_task);
2264 osb->vote_task = NULL;
2269 /* used by the dlm code to make message headers unique, each
2270 * node in this domain must agree on this. */
2271 dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str));
2273 /* for now, uuid == domain */
2274 dlm = dlm_register_domain(osb->uuid_str, dlm_key);
2276 status = PTR_ERR(dlm);
2281 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
2282 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
2284 dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
2291 ocfs2_dlm_shutdown_debug(osb);
2293 kthread_stop(osb->vote_task);
2300 void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
2304 dlm_unregister_eviction_cb(&osb->osb_eviction_cb);
2306 ocfs2_drop_osb_locks(osb);
2308 if (osb->vote_task) {
2309 kthread_stop(osb->vote_task);
2310 osb->vote_task = NULL;
2313 ocfs2_lock_res_free(&osb->osb_super_lockres);
2314 ocfs2_lock_res_free(&osb->osb_rename_lockres);
2316 dlm_unregister_domain(osb->dlm);
2319 ocfs2_dlm_shutdown_debug(osb);
2324 static void ocfs2_unlock_ast_func(void *opaque, enum dlm_status status)
2326 struct ocfs2_lock_res *lockres = opaque;
2327 unsigned long flags;
2331 mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
2332 lockres->l_unlock_action);
2334 spin_lock_irqsave(&lockres->l_lock, flags);
2335 /* We tried to cancel a convert request, but it was already
2336 * granted. All we want to do here is clear our unlock
2337 * state. The wake_up call done at the bottom is redundant
2338 * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
2339 * hurt anything anyway */
2340 if (status == DLM_CANCELGRANT &&
2341 lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2342 mlog(0, "Got cancelgrant for %s\n", lockres->l_name);
2344 /* We don't clear the busy flag in this case as it
2345 * should have been cleared by the ast which the dlm
2347 goto complete_unlock;
2350 if (status != DLM_NORMAL) {
2351 mlog(ML_ERROR, "Dlm passes status %d for lock %s, "
2352 "unlock_action %d\n", status, lockres->l_name,
2353 lockres->l_unlock_action);
2354 spin_unlock_irqrestore(&lockres->l_lock, flags);
2358 switch(lockres->l_unlock_action) {
2359 case OCFS2_UNLOCK_CANCEL_CONVERT:
2360 mlog(0, "Cancel convert success for %s\n", lockres->l_name);
2361 lockres->l_action = OCFS2_AST_INVALID;
2363 case OCFS2_UNLOCK_DROP_LOCK:
2364 lockres->l_level = LKM_IVMODE;
2370 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
2372 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
2373 spin_unlock_irqrestore(&lockres->l_lock, flags);
2375 wake_up(&lockres->l_event);
2380 typedef void (ocfs2_pre_drop_cb_t)(struct ocfs2_lock_res *, void *);
2382 struct drop_lock_cb {
2383 ocfs2_pre_drop_cb_t *drop_func;
2387 static int ocfs2_drop_lock(struct ocfs2_super *osb,
2388 struct ocfs2_lock_res *lockres,
2389 struct drop_lock_cb *dcb)
2391 enum dlm_status status;
2392 unsigned long flags;
2394 /* We didn't get anywhere near actually using this lockres. */
2395 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
2398 spin_lock_irqsave(&lockres->l_lock, flags);
2400 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
2401 "lockres %s, flags 0x%lx\n",
2402 lockres->l_name, lockres->l_flags);
2404 while (lockres->l_flags & OCFS2_LOCK_BUSY) {
2405 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
2406 "%u, unlock_action = %u\n",
2407 lockres->l_name, lockres->l_flags, lockres->l_action,
2408 lockres->l_unlock_action);
2410 spin_unlock_irqrestore(&lockres->l_lock, flags);
2412 /* XXX: Today we just wait on any busy
2413 * locks... Perhaps we need to cancel converts in the
2415 ocfs2_wait_on_busy_lock(lockres);
2417 spin_lock_irqsave(&lockres->l_lock, flags);
2421 dcb->drop_func(lockres, dcb->drop_data);
2423 if (lockres->l_flags & OCFS2_LOCK_BUSY)
2424 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
2426 if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
2427 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
2429 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
2430 spin_unlock_irqrestore(&lockres->l_lock, flags);
2434 lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
2436 /* make sure we never get here while waiting for an ast to
2438 BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
2440 /* is this necessary? */
2441 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2442 lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
2443 spin_unlock_irqrestore(&lockres->l_lock, flags);
2445 mlog(0, "lock %s\n", lockres->l_name);
2447 status = dlmunlock(osb->dlm, &lockres->l_lksb, LKM_VALBLK,
2448 lockres->l_ops->unlock_ast, lockres);
2449 if (status != DLM_NORMAL) {
2450 ocfs2_log_dlm_error("dlmunlock", status, lockres);
2451 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
2452 dlm_print_one_lock(lockres->l_lksb.lockid);
2455 mlog(0, "lock %s, successfull return from dlmunlock\n",
2458 ocfs2_wait_on_busy_lock(lockres);
2464 /* Mark the lockres as being dropped. It will no longer be
2465 * queued if blocking, but we still may have to wait on it
2466 * being dequeued from the vote thread before we can consider
2469 * You can *not* attempt to call cluster_lock on this lockres anymore. */
2470 void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
2473 struct ocfs2_mask_waiter mw;
2474 unsigned long flags;
2476 ocfs2_init_mask_waiter(&mw);
2478 spin_lock_irqsave(&lockres->l_lock, flags);
2479 lockres->l_flags |= OCFS2_LOCK_FREEING;
2480 while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
2481 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
2482 spin_unlock_irqrestore(&lockres->l_lock, flags);
2484 mlog(0, "Waiting on lockres %s\n", lockres->l_name);
2486 status = ocfs2_wait_for_mask(&mw);
2490 spin_lock_irqsave(&lockres->l_lock, flags);
2492 spin_unlock_irqrestore(&lockres->l_lock, flags);
2495 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
2496 struct ocfs2_lock_res *lockres)
2500 ocfs2_mark_lockres_freeing(lockres);
2501 ret = ocfs2_drop_lock(osb, lockres, NULL);
2506 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
2508 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
2509 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
2512 static void ocfs2_meta_pre_drop(struct ocfs2_lock_res *lockres, void *data)
2514 struct inode *inode = data;
2516 /* the metadata lock requires a bit more work as we have an
2517 * LVB to worry about. */
2518 if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
2519 lockres->l_level == LKM_EXMODE &&
2520 !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2521 __ocfs2_stuff_meta_lvb(inode);
2524 int ocfs2_drop_inode_locks(struct inode *inode)
2527 struct drop_lock_cb meta_dcb = { ocfs2_meta_pre_drop, inode, };
2531 /* No need to call ocfs2_mark_lockres_freeing here -
2532 * ocfs2_clear_inode has done it for us. */
2534 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2535 &OCFS2_I(inode)->ip_data_lockres,
2542 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2543 &OCFS2_I(inode)->ip_meta_lockres,
2547 if (err < 0 && !status)
2550 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2551 &OCFS2_I(inode)->ip_rw_lockres,
2555 if (err < 0 && !status)
2562 static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
2565 assert_spin_locked(&lockres->l_lock);
2567 BUG_ON(lockres->l_blocking <= LKM_NLMODE);
2569 if (lockres->l_level <= new_level) {
2570 mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n",
2571 lockres->l_level, new_level);
2575 mlog(0, "lock %s, new_level = %d, l_blocking = %d\n",
2576 lockres->l_name, new_level, lockres->l_blocking);
2578 lockres->l_action = OCFS2_AST_DOWNCONVERT;
2579 lockres->l_requested = new_level;
2580 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2583 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
2584 struct ocfs2_lock_res *lockres,
2588 int ret, dlm_flags = LKM_CONVERT;
2589 enum dlm_status status;
2594 dlm_flags |= LKM_VALBLK;
2596 status = dlmlock(osb->dlm,
2601 OCFS2_LOCK_ID_MAX_LEN - 1,
2602 lockres->l_ops->ast,
2604 lockres->l_ops->bast);
2605 if (status != DLM_NORMAL) {
2606 ocfs2_log_dlm_error("dlmlock", status, lockres);
2608 ocfs2_recover_from_dlm_error(lockres, 1);
2618 /* returns 1 when the caller should unlock and call dlmunlock */
2619 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
2620 struct ocfs2_lock_res *lockres)
2622 assert_spin_locked(&lockres->l_lock);
2625 mlog(0, "lock %s\n", lockres->l_name);
2627 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2628 /* If we're already trying to cancel a lock conversion
2629 * then just drop the spinlock and allow the caller to
2630 * requeue this lock. */
2632 mlog(0, "Lockres %s, skip convert\n", lockres->l_name);
2636 /* were we in a convert when we got the bast fire? */
2637 BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
2638 lockres->l_action != OCFS2_AST_DOWNCONVERT);
2639 /* set things up for the unlockast to know to just
2640 * clear out the ast_action and unset busy, etc. */
2641 lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
2643 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
2644 "lock %s, invalid flags: 0x%lx\n",
2645 lockres->l_name, lockres->l_flags);
2650 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
2651 struct ocfs2_lock_res *lockres)
2654 enum dlm_status status;
2657 mlog(0, "lock %s\n", lockres->l_name);
2660 status = dlmunlock(osb->dlm,
2663 lockres->l_ops->unlock_ast,
2665 if (status != DLM_NORMAL) {
2666 ocfs2_log_dlm_error("dlmunlock", status, lockres);
2668 ocfs2_recover_from_dlm_error(lockres, 0);
2671 mlog(0, "lock %s return from dlmunlock\n", lockres->l_name);
2677 static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode,
2678 struct ocfs2_lock_res *lockres,
2685 BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE);
2687 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
2689 mlog(0, "lockres %s currently being refreshed -- backing "
2690 "off!\n", lockres->l_name);
2691 } else if (new_level == LKM_PRMODE)
2692 ret = !lockres->l_ex_holders &&
2693 ocfs2_inode_fully_checkpointed(inode);
2694 else /* Must be NLMODE we're converting to. */
2695 ret = !lockres->l_ro_holders && !lockres->l_ex_holders &&
2696 ocfs2_inode_fully_checkpointed(inode);
2702 static int ocfs2_do_unblock_meta(struct inode *inode,
2708 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
2709 unsigned long flags;
2711 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2715 spin_lock_irqsave(&lockres->l_lock, flags);
2717 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
2719 mlog(0, "l_level=%d, l_blocking=%d\n", lockres->l_level,
2720 lockres->l_blocking);
2722 BUG_ON(lockres->l_level != LKM_EXMODE &&
2723 lockres->l_level != LKM_PRMODE);
2725 if (lockres->l_flags & OCFS2_LOCK_BUSY) {
2727 ret = ocfs2_prepare_cancel_convert(osb, lockres);
2728 spin_unlock_irqrestore(&lockres->l_lock, flags);
2730 ret = ocfs2_cancel_convert(osb, lockres);
2737 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
2739 mlog(0, "l_level=%d, l_blocking=%d, new_level=%d\n",
2740 lockres->l_level, lockres->l_blocking, new_level);
2742 if (ocfs2_can_downconvert_meta_lock(inode, lockres, new_level)) {
2743 if (lockres->l_level == LKM_EXMODE)
2746 /* If the lock hasn't been refreshed yet (rare), then
2747 * our memory inode values are old and we skip
2748 * stuffing the lvb. There's no need to actually clear
2749 * out the lvb here as it's value is still valid. */
2750 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
2752 __ocfs2_stuff_meta_lvb(inode);
2754 mlog(0, "lockres %s: downconverting stale lock!\n",
2757 mlog(0, "calling ocfs2_downconvert_lock with l_level=%d, "
2758 "l_blocking=%d, new_level=%d\n",
2759 lockres->l_level, lockres->l_blocking, new_level);
2761 ocfs2_prepare_downconvert(lockres, new_level);
2762 spin_unlock_irqrestore(&lockres->l_lock, flags);
2763 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
2766 if (!ocfs2_inode_fully_checkpointed(inode))
2767 ocfs2_start_checkpoint(osb);
2770 spin_unlock_irqrestore(&lockres->l_lock, flags);
2777 static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
2778 struct ocfs2_lock_res *lockres,
2779 struct ocfs2_unblock_ctl *ctl,
2780 ocfs2_convert_worker_t *worker)
2782 unsigned long flags;
2789 spin_lock_irqsave(&lockres->l_lock, flags);
2791 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
2794 if (lockres->l_flags & OCFS2_LOCK_BUSY) {
2796 ret = ocfs2_prepare_cancel_convert(osb, lockres);
2797 spin_unlock_irqrestore(&lockres->l_lock, flags);
2799 ret = ocfs2_cancel_convert(osb, lockres);
2806 /* if we're blocking an exclusive and we have *any* holders,
2808 if ((lockres->l_blocking == LKM_EXMODE)
2809 && (lockres->l_ex_holders || lockres->l_ro_holders)) {
2810 spin_unlock_irqrestore(&lockres->l_lock, flags);
2816 /* If it's a PR we're blocking, then only
2817 * requeue if we've got any EX holders */
2818 if (lockres->l_blocking == LKM_PRMODE &&
2819 lockres->l_ex_holders) {
2820 spin_unlock_irqrestore(&lockres->l_lock, flags);
2826 /* If we get here, then we know that there are no more
2827 * incompatible holders (and anyone asking for an incompatible
2828 * lock is blocked). We can now downconvert the lock */
2832 /* Some lockres types want to do a bit of work before
2833 * downconverting a lock. Allow that here. The worker function
2834 * may sleep, so we save off a copy of what we're blocking as
2835 * it may change while we're not holding the spin lock. */
2836 blocking = lockres->l_blocking;
2837 spin_unlock_irqrestore(&lockres->l_lock, flags);
2839 ctl->unblock_action = worker(lockres, blocking);
2841 if (ctl->unblock_action == UNBLOCK_STOP_POST)
2844 spin_lock_irqsave(&lockres->l_lock, flags);
2845 if (blocking != lockres->l_blocking) {
2846 /* If this changed underneath us, then we can't drop
2853 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
2855 ocfs2_prepare_downconvert(lockres, new_level);
2856 spin_unlock_irqrestore(&lockres->l_lock, flags);
2857 ret = ocfs2_downconvert_lock(osb, lockres, new_level, 0);
2863 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
2866 struct inode *inode;
2867 struct address_space *mapping;
2869 inode = ocfs2_lock_res_inode(lockres);
2870 mapping = inode->i_mapping;
2872 if (filemap_fdatawrite(mapping)) {
2873 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
2874 (unsigned long long)OCFS2_I(inode)->ip_blkno);
2876 sync_mapping_buffers(mapping);
2877 if (blocking == LKM_EXMODE) {
2878 truncate_inode_pages(mapping, 0);
2879 unmap_mapping_range(mapping, 0, 0, 0);
2881 /* We only need to wait on the I/O if we're not also
2882 * truncating pages because truncate_inode_pages waits
2883 * for us above. We don't truncate pages if we're
2884 * blocking anything < EXMODE because we want to keep
2885 * them around in that case. */
2886 filemap_fdatawait(mapping);
2889 return UNBLOCK_CONTINUE;
2892 int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
2893 struct ocfs2_unblock_ctl *ctl)
2896 struct inode *inode;
2897 struct ocfs2_super *osb;
2901 inode = ocfs2_lock_res_inode(lockres);
2902 osb = OCFS2_SB(inode->i_sb);
2904 mlog(0, "unblock inode %llu\n",
2905 (unsigned long long)OCFS2_I(inode)->ip_blkno);
2907 status = ocfs2_generic_unblock_lock(osb, lockres, ctl,
2908 ocfs2_data_convert_worker);
2912 mlog(0, "inode %llu, requeue = %d\n",
2913 (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue);
2919 static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
2920 struct ocfs2_unblock_ctl *ctl)
2923 struct inode *inode;
2927 mlog(0, "Unblock lockres %s\n", lockres->l_name);
2929 inode = ocfs2_lock_res_inode(lockres);
2931 status = ocfs2_generic_unblock_lock(OCFS2_SB(inode->i_sb),
2932 lockres, ctl, NULL);
2940 static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
2941 struct ocfs2_unblock_ctl *ctl)
2944 struct inode *inode;
2948 inode = ocfs2_lock_res_inode(lockres);
2950 mlog(0, "unblock inode %llu\n",
2951 (unsigned long long)OCFS2_I(inode)->ip_blkno);
2953 status = ocfs2_do_unblock_meta(inode, &ctl->requeue);
2957 mlog(0, "inode %llu, requeue = %d\n",
2958 (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue);
2965 * Does the final reference drop on our dentry lock. Right now this
2966 * happens in the vote thread, but we could choose to simplify the
2967 * dlmglue API and push these off to the ocfs2_wq in the future.
2969 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
2970 struct ocfs2_lock_res *lockres)
2972 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2973 ocfs2_dentry_lock_put(osb, dl);
2977 * d_delete() matching dentries before the lock downconvert.
2979 * At this point, any process waiting to destroy the
2980 * dentry_lock due to last ref count is stopped by the
2981 * OCFS2_LOCK_QUEUED flag.
2983 * We have two potential problems
2985 * 1) If we do the last reference drop on our dentry_lock (via dput)
2986 * we'll wind up in ocfs2_release_dentry_lock(), waiting on
2987 * the downconvert to finish. Instead we take an elevated
2988 * reference and push the drop until after we've completed our
2989 * unblock processing.
2991 * 2) There might be another process with a final reference,
2992 * waiting on us to finish processing. If this is the case, we
2993 * detect it and exit out - there's no more dentries anyway.
2995 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
2998 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2999 struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
3000 struct dentry *dentry;
3001 unsigned long flags;
3005 * This node is blocking another node from getting a read
3006 * lock. This happens when we've renamed within a
3007 * directory. We've forced the other nodes to d_delete(), but
3008 * we never actually dropped our lock because it's still
3009 * valid. The downconvert code will retain a PR for this node,
3010 * so there's no further work to do.
3012 if (blocking == LKM_PRMODE)
3013 return UNBLOCK_CONTINUE;
3016 * Mark this inode as potentially orphaned. The code in
3017 * ocfs2_delete_inode() will figure out whether it actually
3018 * needs to be freed or not.
3020 spin_lock(&oi->ip_lock);
3021 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
3022 spin_unlock(&oi->ip_lock);
3025 * Yuck. We need to make sure however that the check of
3026 * OCFS2_LOCK_FREEING and the extra reference are atomic with
3027 * respect to a reference decrement or the setting of that
3030 spin_lock_irqsave(&lockres->l_lock, flags);
3031 spin_lock(&dentry_attach_lock);
3032 if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
3037 spin_unlock(&dentry_attach_lock);
3038 spin_unlock_irqrestore(&lockres->l_lock, flags);
3040 mlog(0, "extra_ref = %d\n", extra_ref);
3043 * We have a process waiting on us in ocfs2_dentry_iput(),
3044 * which means we can't have any more outstanding
3045 * aliases. There's no need to do any more work.
3048 return UNBLOCK_CONTINUE;
3050 spin_lock(&dentry_attach_lock);
3052 dentry = ocfs2_find_local_alias(dl->dl_inode,
3053 dl->dl_parent_blkno, 1);
3056 spin_unlock(&dentry_attach_lock);
3058 mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
3059 dentry->d_name.name);
3062 * The following dcache calls may do an
3063 * iput(). Normally we don't want that from the
3064 * downconverting thread, but in this case it's ok
3065 * because the requesting node already has an
3066 * exclusive lock on the inode, so it can't be queued
3067 * for a downconvert.
3072 spin_lock(&dentry_attach_lock);
3074 spin_unlock(&dentry_attach_lock);
3077 * If we are the last holder of this dentry lock, there is no
3078 * reason to downconvert so skip straight to the unlock.
3080 if (dl->dl_count == 1)
3081 return UNBLOCK_STOP_POST;
3083 return UNBLOCK_CONTINUE_POST;
3086 static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres,
3087 struct ocfs2_unblock_ctl *ctl)
3090 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3091 struct ocfs2_super *osb = OCFS2_SB(dl->dl_inode->i_sb);
3093 mlog(0, "unblock dentry lock: %llu\n",
3094 (unsigned long long)OCFS2_I(dl->dl_inode)->ip_blkno);
3096 ret = ocfs2_generic_unblock_lock(osb,
3099 ocfs2_dentry_convert_worker);
3103 mlog(0, "requeue = %d, post = %d\n", ctl->requeue, ctl->unblock_action);
3108 /* Generic unblock function for any lockres whose private data is an
3109 * ocfs2_super pointer. */
3110 static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
3111 struct ocfs2_unblock_ctl *ctl)
3114 struct ocfs2_super *osb;
3118 mlog(0, "Unblock lockres %s\n", lockres->l_name);
3120 osb = ocfs2_lock_res_super(lockres);
3122 status = ocfs2_generic_unblock_lock(osb,
3133 void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3134 struct ocfs2_lock_res *lockres)
3137 struct ocfs2_unblock_ctl ctl = {0, 0,};
3138 unsigned long flags;
3140 /* Our reference to the lockres in this function can be
3141 * considered valid until we remove the OCFS2_LOCK_QUEUED
3147 BUG_ON(!lockres->l_ops);
3148 BUG_ON(!lockres->l_ops->unblock);
3150 mlog(0, "lockres %s blocked.\n", lockres->l_name);
3152 /* Detect whether a lock has been marked as going away while
3153 * the vote thread was processing other things. A lock can
3154 * still be marked with OCFS2_LOCK_FREEING after this check,
3155 * but short circuiting here will still save us some
3157 spin_lock_irqsave(&lockres->l_lock, flags);
3158 if (lockres->l_flags & OCFS2_LOCK_FREEING)
3160 spin_unlock_irqrestore(&lockres->l_lock, flags);
3162 status = lockres->l_ops->unblock(lockres, &ctl);
3166 spin_lock_irqsave(&lockres->l_lock, flags);
3168 if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
3169 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
3171 ocfs2_schedule_blocked_lock(osb, lockres);
3173 mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
3174 ctl.requeue ? "yes" : "no");
3175 spin_unlock_irqrestore(&lockres->l_lock, flags);
3177 if (ctl.unblock_action != UNBLOCK_CONTINUE
3178 && lockres->l_ops->post_unlock)
3179 lockres->l_ops->post_unlock(osb, lockres);
3184 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
3185 struct ocfs2_lock_res *lockres)
3189 assert_spin_locked(&lockres->l_lock);
3191 if (lockres->l_flags & OCFS2_LOCK_FREEING) {
3192 /* Do not schedule a lock for downconvert when it's on
3193 * the way to destruction - any nodes wanting access
3194 * to the resource will get it soon. */
3195 mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n",
3196 lockres->l_name, lockres->l_flags);
3200 lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
3202 spin_lock(&osb->vote_task_lock);
3203 if (list_empty(&lockres->l_blocked_list)) {
3204 list_add_tail(&lockres->l_blocked_list,
3205 &osb->blocked_lock_list);
3206 osb->blocked_lock_count++;
3208 spin_unlock(&osb->vote_task_lock);
3213 /* This aids in debugging situations where a bad LVB might be involved. */
3214 void ocfs2_dump_meta_lvb_info(u64 level,
3215 const char *function,
3217 struct ocfs2_lock_res *lockres)
3219 struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
3221 mlog(level, "LVB information for %s (called from %s:%u):\n",
3222 lockres->l_name, function, line);
3223 mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
3224 lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
3225 be32_to_cpu(lvb->lvb_igeneration));
3226 mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
3227 (unsigned long long)be64_to_cpu(lvb->lvb_isize),
3228 be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
3229 be16_to_cpu(lvb->lvb_imode));
3230 mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
3231 "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
3232 (long long)be64_to_cpu(lvb->lvb_iatime_packed),
3233 (long long)be64_to_cpu(lvb->lvb_ictime_packed),
3234 (long long)be64_to_cpu(lvb->lvb_imtime_packed),
3235 be32_to_cpu(lvb->lvb_iattr));