]> err.no Git - linux-2.6/blob - fs/ocfs2/dlmglue.c
ocfs2: Add ->check_downconvert callback in dlmglue
[linux-2.6] / fs / ocfs2 / dlmglue.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmglue.c
5  *
6  * Code which implements an OCFS2 specific interface to our DLM.
7  *
8  * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  */
25
26 #include <linux/types.h>
27 #include <linux/slab.h>
28 #include <linux/highmem.h>
29 #include <linux/mm.h>
30 #include <linux/smp_lock.h>
31 #include <linux/crc32.h>
32 #include <linux/kthread.h>
33 #include <linux/pagemap.h>
34 #include <linux/debugfs.h>
35 #include <linux/seq_file.h>
36
37 #include <cluster/heartbeat.h>
38 #include <cluster/nodemanager.h>
39 #include <cluster/tcp.h>
40
41 #include <dlm/dlmapi.h>
42
43 #define MLOG_MASK_PREFIX ML_DLM_GLUE
44 #include <cluster/masklog.h>
45
46 #include "ocfs2.h"
47
48 #include "alloc.h"
49 #include "dcache.h"
50 #include "dlmglue.h"
51 #include "extent_map.h"
52 #include "heartbeat.h"
53 #include "inode.h"
54 #include "journal.h"
55 #include "slot_map.h"
56 #include "super.h"
57 #include "uptodate.h"
58 #include "vote.h"
59
60 #include "buffer_head_io.h"
61
62 struct ocfs2_mask_waiter {
63         struct list_head        mw_item;
64         int                     mw_status;
65         struct completion       mw_complete;
66         unsigned long           mw_mask;
67         unsigned long           mw_goal;
68 };
69
70 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
71 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
72
73 /*
74  * Return value from ocfs2_convert_worker_t functions.
75  *
76  * These control the precise actions of ocfs2_generic_unblock_lock()
77  * and ocfs2_process_blocked_lock()
78  *
79  */
80 enum ocfs2_unblock_action {
81         UNBLOCK_CONTINUE        = 0, /* Continue downconvert */
82         UNBLOCK_CONTINUE_POST   = 1, /* Continue downconvert, fire
83                                       * ->post_unlock callback */
84         UNBLOCK_STOP_POST       = 2, /* Do not downconvert, fire
85                                       * ->post_unlock() callback. */
86 };
87
88 struct ocfs2_unblock_ctl {
89         int requeue;
90         enum ocfs2_unblock_action unblock_action;
91 };
92
93 static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
94                               struct ocfs2_unblock_ctl *ctl);
95 static int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
96                               struct ocfs2_unblock_ctl *ctl);
97 static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
98                                     struct ocfs2_unblock_ctl *ctl);
99 static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres,
100                                      struct ocfs2_unblock_ctl *ctl);
101 static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
102                                   struct ocfs2_unblock_ctl *ctl);
103
104 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
105                                      struct ocfs2_lock_res *lockres);
106
107 /*
108  * OCFS2 Lock Resource Operations
109  *
110  * These fine tune the behavior of the generic dlmglue locking infrastructure.
111  */
112 struct ocfs2_lock_res_ops {
113         /*
114          * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
115          * this callback if ->l_priv is not an ocfs2_super pointer
116          */
117         struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
118         int  (*unblock)(struct ocfs2_lock_res *, struct ocfs2_unblock_ctl *);
119         void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
120
121         /*
122          * Allow a lock type to add checks to determine whether it is
123          * safe to downconvert a lock. Return 0 to re-queue the
124          * downconvert at a later time, nonzero to continue.
125          *
126          * For most locks, the default checks that there are no
127          * incompatible holders are sufficient.
128          *
129          * Called with the lockres spinlock held.
130          */
131         int (*check_downconvert)(struct ocfs2_lock_res *, int);
132
133         /*
134          * LOCK_TYPE_* flags which describe the specific requirements
135          * of a lock type. Descriptions of each individual flag follow.
136          */
137         int flags;
138 };
139
140 /*
141  * Some locks want to "refresh" potentially stale data when a
142  * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
143  * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
144  * individual lockres l_flags member from the ast function. It is
145  * expected that the locking wrapper will clear the
146  * OCFS2_LOCK_NEEDS_REFRESH flag when done.
147  */
148 #define LOCK_TYPE_REQUIRES_REFRESH 0x1
149
150 /*
151  * Indicate that a lock type makes use of the lock value block.
152  */
153 #define LOCK_TYPE_USES_LVB              0x2
154
155 typedef int (ocfs2_convert_worker_t)(struct ocfs2_lock_res *, int);
156 static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
157                                       struct ocfs2_lock_res *lockres,
158                                       struct ocfs2_unblock_ctl *ctl,
159                                       ocfs2_convert_worker_t *worker);
160
161 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
162         .get_osb        = ocfs2_get_inode_osb,
163         .unblock        = ocfs2_unblock_inode_lock,
164         .flags          = 0,
165 };
166
167 static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = {
168         .get_osb        = ocfs2_get_inode_osb,
169         .unblock        = ocfs2_unblock_meta,
170         .flags          = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
171 };
172
173 static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = {
174         .get_osb        = ocfs2_get_inode_osb,
175         .unblock        = ocfs2_unblock_data,
176         .flags          = 0,
177 };
178
179 static struct ocfs2_lock_res_ops ocfs2_super_lops = {
180         .unblock        = ocfs2_unblock_osb_lock,
181         .flags          = LOCK_TYPE_REQUIRES_REFRESH,
182 };
183
184 static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
185         .unblock        = ocfs2_unblock_osb_lock,
186         .flags          = 0,
187 };
188
189 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
190         .get_osb        = ocfs2_get_dentry_osb,
191         .unblock        = ocfs2_unblock_dentry_lock,
192         .post_unlock    = ocfs2_dentry_post_unlock,
193         .flags          = 0,
194 };
195
196 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
197 {
198         return lockres->l_type == OCFS2_LOCK_TYPE_META ||
199                 lockres->l_type == OCFS2_LOCK_TYPE_DATA ||
200                 lockres->l_type == OCFS2_LOCK_TYPE_RW;
201 }
202
203 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
204 {
205         BUG_ON(!ocfs2_is_inode_lock(lockres));
206
207         return (struct inode *) lockres->l_priv;
208 }
209
210 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
211 {
212         BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
213
214         return (struct ocfs2_dentry_lock *)lockres->l_priv;
215 }
216
217 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
218 {
219         if (lockres->l_ops->get_osb)
220                 return lockres->l_ops->get_osb(lockres);
221
222         return (struct ocfs2_super *)lockres->l_priv;
223 }
224
225 static int ocfs2_lock_create(struct ocfs2_super *osb,
226                              struct ocfs2_lock_res *lockres,
227                              int level,
228                              int dlm_flags);
229 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
230                                                      int wanted);
231 static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
232                                  struct ocfs2_lock_res *lockres,
233                                  int level);
234 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
235 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
236 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
237 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
238 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
239                                         struct ocfs2_lock_res *lockres);
240 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
241                                                 int convert);
242 #define ocfs2_log_dlm_error(_func, _stat, _lockres) do {        \
243         mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on "  \
244                 "resource %s: %s\n", dlm_errname(_stat), _func, \
245                 _lockres->l_name, dlm_errmsg(_stat));           \
246 } while (0)
247 static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
248                                  struct ocfs2_lock_res *lockres);
249 static int ocfs2_meta_lock_update(struct inode *inode,
250                                   struct buffer_head **bh);
251 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
252 static inline int ocfs2_highest_compat_lock_level(int level);
253 static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode,
254                                                   struct ocfs2_lock_res *lockres,
255                                                   int new_level);
256
257 static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
258                                   u64 blkno,
259                                   u32 generation,
260                                   char *name)
261 {
262         int len;
263
264         mlog_entry_void();
265
266         BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
267
268         len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
269                        ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
270                        (long long)blkno, generation);
271
272         BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
273
274         mlog(0, "built lock resource with name: %s\n", name);
275
276         mlog_exit_void();
277 }
278
279 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
280
281 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
282                                        struct ocfs2_dlm_debug *dlm_debug)
283 {
284         mlog(0, "Add tracking for lockres %s\n", res->l_name);
285
286         spin_lock(&ocfs2_dlm_tracking_lock);
287         list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
288         spin_unlock(&ocfs2_dlm_tracking_lock);
289 }
290
291 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
292 {
293         spin_lock(&ocfs2_dlm_tracking_lock);
294         if (!list_empty(&res->l_debug_list))
295                 list_del_init(&res->l_debug_list);
296         spin_unlock(&ocfs2_dlm_tracking_lock);
297 }
298
299 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
300                                        struct ocfs2_lock_res *res,
301                                        enum ocfs2_lock_type type,
302                                        struct ocfs2_lock_res_ops *ops,
303                                        void *priv)
304 {
305         res->l_type          = type;
306         res->l_ops           = ops;
307         res->l_priv          = priv;
308
309         res->l_level         = LKM_IVMODE;
310         res->l_requested     = LKM_IVMODE;
311         res->l_blocking      = LKM_IVMODE;
312         res->l_action        = OCFS2_AST_INVALID;
313         res->l_unlock_action = OCFS2_UNLOCK_INVALID;
314
315         res->l_flags         = OCFS2_LOCK_INITIALIZED;
316
317         ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
318 }
319
320 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
321 {
322         /* This also clears out the lock status block */
323         memset(res, 0, sizeof(struct ocfs2_lock_res));
324         spin_lock_init(&res->l_lock);
325         init_waitqueue_head(&res->l_event);
326         INIT_LIST_HEAD(&res->l_blocked_list);
327         INIT_LIST_HEAD(&res->l_mask_waiters);
328 }
329
330 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
331                                enum ocfs2_lock_type type,
332                                unsigned int generation,
333                                struct inode *inode)
334 {
335         struct ocfs2_lock_res_ops *ops;
336
337         switch(type) {
338                 case OCFS2_LOCK_TYPE_RW:
339                         ops = &ocfs2_inode_rw_lops;
340                         break;
341                 case OCFS2_LOCK_TYPE_META:
342                         ops = &ocfs2_inode_meta_lops;
343                         break;
344                 case OCFS2_LOCK_TYPE_DATA:
345                         ops = &ocfs2_inode_data_lops;
346                         break;
347                 default:
348                         mlog_bug_on_msg(1, "type: %d\n", type);
349                         ops = NULL; /* thanks, gcc */
350                         break;
351         };
352
353         ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
354                               generation, res->l_name);
355         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
356 }
357
358 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
359 {
360         struct inode *inode = ocfs2_lock_res_inode(lockres);
361
362         return OCFS2_SB(inode->i_sb);
363 }
364
365 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
366 {
367         __be64 inode_blkno_be;
368
369         memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
370                sizeof(__be64));
371
372         return be64_to_cpu(inode_blkno_be);
373 }
374
375 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
376 {
377         struct ocfs2_dentry_lock *dl = lockres->l_priv;
378
379         return OCFS2_SB(dl->dl_inode->i_sb);
380 }
381
382 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
383                                 u64 parent, struct inode *inode)
384 {
385         int len;
386         u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
387         __be64 inode_blkno_be = cpu_to_be64(inode_blkno);
388         struct ocfs2_lock_res *lockres = &dl->dl_lockres;
389
390         ocfs2_lock_res_init_once(lockres);
391
392         /*
393          * Unfortunately, the standard lock naming scheme won't work
394          * here because we have two 16 byte values to use. Instead,
395          * we'll stuff the inode number as a binary value. We still
396          * want error prints to show something without garbling the
397          * display, so drop a null byte in there before the inode
398          * number. A future version of OCFS2 will likely use all
399          * binary lock names. The stringified names have been a
400          * tremendous aid in debugging, but now that the debugfs
401          * interface exists, we can mangle things there if need be.
402          *
403          * NOTE: We also drop the standard "pad" value (the total lock
404          * name size stays the same though - the last part is all
405          * zeros due to the memset in ocfs2_lock_res_init_once()
406          */
407         len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
408                        "%c%016llx",
409                        ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
410                        (long long)parent);
411
412         BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
413
414         memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
415                sizeof(__be64));
416
417         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
418                                    OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
419                                    dl);
420 }
421
422 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
423                                       struct ocfs2_super *osb)
424 {
425         /* Superblock lockres doesn't come from a slab so we call init
426          * once on it manually.  */
427         ocfs2_lock_res_init_once(res);
428         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
429                               0, res->l_name);
430         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
431                                    &ocfs2_super_lops, osb);
432 }
433
434 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
435                                        struct ocfs2_super *osb)
436 {
437         /* Rename lockres doesn't come from a slab so we call init
438          * once on it manually.  */
439         ocfs2_lock_res_init_once(res);
440         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
441         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
442                                    &ocfs2_rename_lops, osb);
443 }
444
445 void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
446 {
447         mlog_entry_void();
448
449         if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
450                 return;
451
452         ocfs2_remove_lockres_tracking(res);
453
454         mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
455                         "Lockres %s is on the blocked list\n",
456                         res->l_name);
457         mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
458                         "Lockres %s has mask waiters pending\n",
459                         res->l_name);
460         mlog_bug_on_msg(spin_is_locked(&res->l_lock),
461                         "Lockres %s is locked\n",
462                         res->l_name);
463         mlog_bug_on_msg(res->l_ro_holders,
464                         "Lockres %s has %u ro holders\n",
465                         res->l_name, res->l_ro_holders);
466         mlog_bug_on_msg(res->l_ex_holders,
467                         "Lockres %s has %u ex holders\n",
468                         res->l_name, res->l_ex_holders);
469
470         /* Need to clear out the lock status block for the dlm */
471         memset(&res->l_lksb, 0, sizeof(res->l_lksb));
472
473         res->l_flags = 0UL;
474         mlog_exit_void();
475 }
476
477 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
478                                      int level)
479 {
480         mlog_entry_void();
481
482         BUG_ON(!lockres);
483
484         switch(level) {
485         case LKM_EXMODE:
486                 lockres->l_ex_holders++;
487                 break;
488         case LKM_PRMODE:
489                 lockres->l_ro_holders++;
490                 break;
491         default:
492                 BUG();
493         }
494
495         mlog_exit_void();
496 }
497
498 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
499                                      int level)
500 {
501         mlog_entry_void();
502
503         BUG_ON(!lockres);
504
505         switch(level) {
506         case LKM_EXMODE:
507                 BUG_ON(!lockres->l_ex_holders);
508                 lockres->l_ex_holders--;
509                 break;
510         case LKM_PRMODE:
511                 BUG_ON(!lockres->l_ro_holders);
512                 lockres->l_ro_holders--;
513                 break;
514         default:
515                 BUG();
516         }
517         mlog_exit_void();
518 }
519
520 /* WARNING: This function lives in a world where the only three lock
521  * levels are EX, PR, and NL. It *will* have to be adjusted when more
522  * lock types are added. */
523 static inline int ocfs2_highest_compat_lock_level(int level)
524 {
525         int new_level = LKM_EXMODE;
526
527         if (level == LKM_EXMODE)
528                 new_level = LKM_NLMODE;
529         else if (level == LKM_PRMODE)
530                 new_level = LKM_PRMODE;
531         return new_level;
532 }
533
534 static void lockres_set_flags(struct ocfs2_lock_res *lockres,
535                               unsigned long newflags)
536 {
537         struct list_head *pos, *tmp;
538         struct ocfs2_mask_waiter *mw;
539
540         assert_spin_locked(&lockres->l_lock);
541
542         lockres->l_flags = newflags;
543
544         list_for_each_safe(pos, tmp, &lockres->l_mask_waiters) {
545                 mw = list_entry(pos, struct ocfs2_mask_waiter, mw_item);
546                 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
547                         continue;
548
549                 list_del_init(&mw->mw_item);
550                 mw->mw_status = 0;
551                 complete(&mw->mw_complete);
552         }
553 }
554 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
555 {
556         lockres_set_flags(lockres, lockres->l_flags | or);
557 }
558 static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
559                                 unsigned long clear)
560 {
561         lockres_set_flags(lockres, lockres->l_flags & ~clear);
562 }
563
564 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
565 {
566         mlog_entry_void();
567
568         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
569         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
570         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
571         BUG_ON(lockres->l_blocking <= LKM_NLMODE);
572
573         lockres->l_level = lockres->l_requested;
574         if (lockres->l_level <=
575             ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
576                 lockres->l_blocking = LKM_NLMODE;
577                 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
578         }
579         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
580
581         mlog_exit_void();
582 }
583
584 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
585 {
586         mlog_entry_void();
587
588         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
589         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
590
591         /* Convert from RO to EX doesn't really need anything as our
592          * information is already up to data. Convert from NL to
593          * *anything* however should mark ourselves as needing an
594          * update */
595         if (lockres->l_level == LKM_NLMODE &&
596             lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
597                 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
598
599         lockres->l_level = lockres->l_requested;
600         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
601
602         mlog_exit_void();
603 }
604
605 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
606 {
607         mlog_entry_void();
608
609         BUG_ON((!lockres->l_flags & OCFS2_LOCK_BUSY));
610         BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
611
612         if (lockres->l_requested > LKM_NLMODE &&
613             !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
614             lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
615                 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
616
617         lockres->l_level = lockres->l_requested;
618         lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
619         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
620
621         mlog_exit_void();
622 }
623
624 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
625                                      int level)
626 {
627         int needs_downconvert = 0;
628         mlog_entry_void();
629
630         assert_spin_locked(&lockres->l_lock);
631
632         lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
633
634         if (level > lockres->l_blocking) {
635                 /* only schedule a downconvert if we haven't already scheduled
636                  * one that goes low enough to satisfy the level we're
637                  * blocking.  this also catches the case where we get
638                  * duplicate BASTs */
639                 if (ocfs2_highest_compat_lock_level(level) <
640                     ocfs2_highest_compat_lock_level(lockres->l_blocking))
641                         needs_downconvert = 1;
642
643                 lockres->l_blocking = level;
644         }
645
646         mlog_exit(needs_downconvert);
647         return needs_downconvert;
648 }
649
650 static void ocfs2_blocking_ast(void *opaque, int level)
651 {
652         struct ocfs2_lock_res *lockres = opaque;
653         struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
654         int needs_downconvert;
655         unsigned long flags;
656
657         BUG_ON(level <= LKM_NLMODE);
658
659         mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
660              lockres->l_name, level, lockres->l_level,
661              ocfs2_lock_type_string(lockres->l_type));
662
663         spin_lock_irqsave(&lockres->l_lock, flags);
664         needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
665         if (needs_downconvert)
666                 ocfs2_schedule_blocked_lock(osb, lockres);
667         spin_unlock_irqrestore(&lockres->l_lock, flags);
668
669         wake_up(&lockres->l_event);
670
671         ocfs2_kick_vote_thread(osb);
672 }
673
674 static void ocfs2_locking_ast(void *opaque)
675 {
676         struct ocfs2_lock_res *lockres = opaque;
677         struct dlm_lockstatus *lksb = &lockres->l_lksb;
678         unsigned long flags;
679
680         spin_lock_irqsave(&lockres->l_lock, flags);
681
682         if (lksb->status != DLM_NORMAL) {
683                 mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n",
684                      lockres->l_name, lksb->status);
685                 spin_unlock_irqrestore(&lockres->l_lock, flags);
686                 return;
687         }
688
689         switch(lockres->l_action) {
690         case OCFS2_AST_ATTACH:
691                 ocfs2_generic_handle_attach_action(lockres);
692                 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
693                 break;
694         case OCFS2_AST_CONVERT:
695                 ocfs2_generic_handle_convert_action(lockres);
696                 break;
697         case OCFS2_AST_DOWNCONVERT:
698                 ocfs2_generic_handle_downconvert_action(lockres);
699                 break;
700         default:
701                 mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
702                      "lockres flags = 0x%lx, unlock action: %u\n",
703                      lockres->l_name, lockres->l_action, lockres->l_flags,
704                      lockres->l_unlock_action);
705                 BUG();
706         }
707
708         /* set it to something invalid so if we get called again we
709          * can catch it. */
710         lockres->l_action = OCFS2_AST_INVALID;
711
712         wake_up(&lockres->l_event);
713         spin_unlock_irqrestore(&lockres->l_lock, flags);
714 }
715
716 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
717                                                 int convert)
718 {
719         unsigned long flags;
720
721         mlog_entry_void();
722         spin_lock_irqsave(&lockres->l_lock, flags);
723         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
724         if (convert)
725                 lockres->l_action = OCFS2_AST_INVALID;
726         else
727                 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
728         spin_unlock_irqrestore(&lockres->l_lock, flags);
729
730         wake_up(&lockres->l_event);
731         mlog_exit_void();
732 }
733
734 /* Note: If we detect another process working on the lock (i.e.,
735  * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
736  * to do the right thing in that case.
737  */
738 static int ocfs2_lock_create(struct ocfs2_super *osb,
739                              struct ocfs2_lock_res *lockres,
740                              int level,
741                              int dlm_flags)
742 {
743         int ret = 0;
744         enum dlm_status status;
745         unsigned long flags;
746
747         mlog_entry_void();
748
749         mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level,
750              dlm_flags);
751
752         spin_lock_irqsave(&lockres->l_lock, flags);
753         if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
754             (lockres->l_flags & OCFS2_LOCK_BUSY)) {
755                 spin_unlock_irqrestore(&lockres->l_lock, flags);
756                 goto bail;
757         }
758
759         lockres->l_action = OCFS2_AST_ATTACH;
760         lockres->l_requested = level;
761         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
762         spin_unlock_irqrestore(&lockres->l_lock, flags);
763
764         status = dlmlock(osb->dlm,
765                          level,
766                          &lockres->l_lksb,
767                          dlm_flags,
768                          lockres->l_name,
769                          OCFS2_LOCK_ID_MAX_LEN - 1,
770                          ocfs2_locking_ast,
771                          lockres,
772                          ocfs2_blocking_ast);
773         if (status != DLM_NORMAL) {
774                 ocfs2_log_dlm_error("dlmlock", status, lockres);
775                 ret = -EINVAL;
776                 ocfs2_recover_from_dlm_error(lockres, 1);
777         }
778
779         mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name);
780
781 bail:
782         mlog_exit(ret);
783         return ret;
784 }
785
786 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
787                                         int flag)
788 {
789         unsigned long flags;
790         int ret;
791
792         spin_lock_irqsave(&lockres->l_lock, flags);
793         ret = lockres->l_flags & flag;
794         spin_unlock_irqrestore(&lockres->l_lock, flags);
795
796         return ret;
797 }
798
799 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
800
801 {
802         wait_event(lockres->l_event,
803                    !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
804 }
805
806 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
807
808 {
809         wait_event(lockres->l_event,
810                    !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
811 }
812
813 /* predict what lock level we'll be dropping down to on behalf
814  * of another node, and return true if the currently wanted
815  * level will be compatible with it. */
816 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
817                                                      int wanted)
818 {
819         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
820
821         return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
822 }
823
824 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
825 {
826         INIT_LIST_HEAD(&mw->mw_item);
827         init_completion(&mw->mw_complete);
828 }
829
830 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
831 {
832         wait_for_completion(&mw->mw_complete);
833         /* Re-arm the completion in case we want to wait on it again */
834         INIT_COMPLETION(mw->mw_complete);
835         return mw->mw_status;
836 }
837
838 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
839                                     struct ocfs2_mask_waiter *mw,
840                                     unsigned long mask,
841                                     unsigned long goal)
842 {
843         BUG_ON(!list_empty(&mw->mw_item));
844
845         assert_spin_locked(&lockres->l_lock);
846
847         list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
848         mw->mw_mask = mask;
849         mw->mw_goal = goal;
850 }
851
852 /* returns 0 if the mw that was removed was already satisfied, -EBUSY
853  * if the mask still hadn't reached its goal */
854 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
855                                       struct ocfs2_mask_waiter *mw)
856 {
857         unsigned long flags;
858         int ret = 0;
859
860         spin_lock_irqsave(&lockres->l_lock, flags);
861         if (!list_empty(&mw->mw_item)) {
862                 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
863                         ret = -EBUSY;
864
865                 list_del_init(&mw->mw_item);
866                 init_completion(&mw->mw_complete);
867         }
868         spin_unlock_irqrestore(&lockres->l_lock, flags);
869
870         return ret;
871
872 }
873
874 static int ocfs2_cluster_lock(struct ocfs2_super *osb,
875                               struct ocfs2_lock_res *lockres,
876                               int level,
877                               int lkm_flags,
878                               int arg_flags)
879 {
880         struct ocfs2_mask_waiter mw;
881         enum dlm_status status;
882         int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
883         int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
884         unsigned long flags;
885
886         mlog_entry_void();
887
888         ocfs2_init_mask_waiter(&mw);
889
890         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
891                 lkm_flags |= LKM_VALBLK;
892
893 again:
894         wait = 0;
895
896         if (catch_signals && signal_pending(current)) {
897                 ret = -ERESTARTSYS;
898                 goto out;
899         }
900
901         spin_lock_irqsave(&lockres->l_lock, flags);
902
903         mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
904                         "Cluster lock called on freeing lockres %s! flags "
905                         "0x%lx\n", lockres->l_name, lockres->l_flags);
906
907         /* We only compare against the currently granted level
908          * here. If the lock is blocked waiting on a downconvert,
909          * we'll get caught below. */
910         if (lockres->l_flags & OCFS2_LOCK_BUSY &&
911             level > lockres->l_level) {
912                 /* is someone sitting in dlm_lock? If so, wait on
913                  * them. */
914                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
915                 wait = 1;
916                 goto unlock;
917         }
918
919         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
920                 /* lock has not been created yet. */
921                 spin_unlock_irqrestore(&lockres->l_lock, flags);
922
923                 ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0);
924                 if (ret < 0) {
925                         mlog_errno(ret);
926                         goto out;
927                 }
928                 goto again;
929         }
930
931         if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
932             !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
933                 /* is the lock is currently blocked on behalf of
934                  * another node */
935                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
936                 wait = 1;
937                 goto unlock;
938         }
939
940         if (level > lockres->l_level) {
941                 if (lockres->l_action != OCFS2_AST_INVALID)
942                         mlog(ML_ERROR, "lockres %s has action %u pending\n",
943                              lockres->l_name, lockres->l_action);
944
945                 lockres->l_action = OCFS2_AST_CONVERT;
946                 lockres->l_requested = level;
947                 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
948                 spin_unlock_irqrestore(&lockres->l_lock, flags);
949
950                 BUG_ON(level == LKM_IVMODE);
951                 BUG_ON(level == LKM_NLMODE);
952
953                 mlog(0, "lock %s, convert from %d to level = %d\n",
954                      lockres->l_name, lockres->l_level, level);
955
956                 /* call dlm_lock to upgrade lock now */
957                 status = dlmlock(osb->dlm,
958                                  level,
959                                  &lockres->l_lksb,
960                                  lkm_flags|LKM_CONVERT,
961                                  lockres->l_name,
962                                  OCFS2_LOCK_ID_MAX_LEN - 1,
963                                  ocfs2_locking_ast,
964                                  lockres,
965                                  ocfs2_blocking_ast);
966                 if (status != DLM_NORMAL) {
967                         if ((lkm_flags & LKM_NOQUEUE) &&
968                             (status == DLM_NOTQUEUED))
969                                 ret = -EAGAIN;
970                         else {
971                                 ocfs2_log_dlm_error("dlmlock", status,
972                                                     lockres);
973                                 ret = -EINVAL;
974                         }
975                         ocfs2_recover_from_dlm_error(lockres, 1);
976                         goto out;
977                 }
978
979                 mlog(0, "lock %s, successfull return from dlmlock\n",
980                      lockres->l_name);
981
982                 /* At this point we've gone inside the dlm and need to
983                  * complete our work regardless. */
984                 catch_signals = 0;
985
986                 /* wait for busy to clear and carry on */
987                 goto again;
988         }
989
990         /* Ok, if we get here then we're good to go. */
991         ocfs2_inc_holders(lockres, level);
992
993         ret = 0;
994 unlock:
995         spin_unlock_irqrestore(&lockres->l_lock, flags);
996 out:
997         /*
998          * This is helping work around a lock inversion between the page lock
999          * and dlm locks.  One path holds the page lock while calling aops
1000          * which block acquiring dlm locks.  The voting thread holds dlm
1001          * locks while acquiring page locks while down converting data locks.
1002          * This block is helping an aop path notice the inversion and back
1003          * off to unlock its page lock before trying the dlm lock again.
1004          */
1005         if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1006             mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1007                 wait = 0;
1008                 if (lockres_remove_mask_waiter(lockres, &mw))
1009                         ret = -EAGAIN;
1010                 else
1011                         goto again;
1012         }
1013         if (wait) {
1014                 ret = ocfs2_wait_for_mask(&mw);
1015                 if (ret == 0)
1016                         goto again;
1017                 mlog_errno(ret);
1018         }
1019
1020         mlog_exit(ret);
1021         return ret;
1022 }
1023
1024 static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
1025                                  struct ocfs2_lock_res *lockres,
1026                                  int level)
1027 {
1028         unsigned long flags;
1029
1030         mlog_entry_void();
1031         spin_lock_irqsave(&lockres->l_lock, flags);
1032         ocfs2_dec_holders(lockres, level);
1033         ocfs2_vote_on_unlock(osb, lockres);
1034         spin_unlock_irqrestore(&lockres->l_lock, flags);
1035         mlog_exit_void();
1036 }
1037
1038 int ocfs2_create_new_lock(struct ocfs2_super *osb,
1039                           struct ocfs2_lock_res *lockres,
1040                           int ex,
1041                           int local)
1042 {
1043         int level =  ex ? LKM_EXMODE : LKM_PRMODE;
1044         unsigned long flags;
1045         int lkm_flags = local ? LKM_LOCAL : 0;
1046
1047         spin_lock_irqsave(&lockres->l_lock, flags);
1048         BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1049         lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1050         spin_unlock_irqrestore(&lockres->l_lock, flags);
1051
1052         return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1053 }
1054
1055 /* Grants us an EX lock on the data and metadata resources, skipping
1056  * the normal cluster directory lookup. Use this ONLY on newly created
1057  * inodes which other nodes can't possibly see, and which haven't been
1058  * hashed in the inode hash yet. This can give us a good performance
1059  * increase as it'll skip the network broadcast normally associated
1060  * with creating a new lock resource. */
1061 int ocfs2_create_new_inode_locks(struct inode *inode)
1062 {
1063         int ret;
1064         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1065
1066         BUG_ON(!inode);
1067         BUG_ON(!ocfs2_inode_is_new(inode));
1068
1069         mlog_entry_void();
1070
1071         mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1072
1073         /* NOTE: That we don't increment any of the holder counts, nor
1074          * do we add anything to a journal handle. Since this is
1075          * supposed to be a new inode which the cluster doesn't know
1076          * about yet, there is no need to.  As far as the LVB handling
1077          * is concerned, this is basically like acquiring an EX lock
1078          * on a resource which has an invalid one -- we'll set it
1079          * valid when we release the EX. */
1080
1081         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1082         if (ret) {
1083                 mlog_errno(ret);
1084                 goto bail;
1085         }
1086
1087         /*
1088          * We don't want to use LKM_LOCAL on a meta data lock as they
1089          * don't use a generation in their lock names.
1090          */
1091         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_meta_lockres, 1, 0);
1092         if (ret) {
1093                 mlog_errno(ret);
1094                 goto bail;
1095         }
1096
1097         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_data_lockres, 1, 1);
1098         if (ret) {
1099                 mlog_errno(ret);
1100                 goto bail;
1101         }
1102
1103 bail:
1104         mlog_exit(ret);
1105         return ret;
1106 }
1107
1108 int ocfs2_rw_lock(struct inode *inode, int write)
1109 {
1110         int status, level;
1111         struct ocfs2_lock_res *lockres;
1112
1113         BUG_ON(!inode);
1114
1115         mlog_entry_void();
1116
1117         mlog(0, "inode %llu take %s RW lock\n",
1118              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1119              write ? "EXMODE" : "PRMODE");
1120
1121         lockres = &OCFS2_I(inode)->ip_rw_lockres;
1122
1123         level = write ? LKM_EXMODE : LKM_PRMODE;
1124
1125         status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1126                                     0);
1127         if (status < 0)
1128                 mlog_errno(status);
1129
1130         mlog_exit(status);
1131         return status;
1132 }
1133
1134 void ocfs2_rw_unlock(struct inode *inode, int write)
1135 {
1136         int level = write ? LKM_EXMODE : LKM_PRMODE;
1137         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1138
1139         mlog_entry_void();
1140
1141         mlog(0, "inode %llu drop %s RW lock\n",
1142              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1143              write ? "EXMODE" : "PRMODE");
1144
1145         ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1146
1147         mlog_exit_void();
1148 }
1149
1150 int ocfs2_data_lock_full(struct inode *inode,
1151                          int write,
1152                          int arg_flags)
1153 {
1154         int status = 0, level;
1155         struct ocfs2_lock_res *lockres;
1156
1157         BUG_ON(!inode);
1158
1159         mlog_entry_void();
1160
1161         mlog(0, "inode %llu take %s DATA lock\n",
1162              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1163              write ? "EXMODE" : "PRMODE");
1164
1165         /* We'll allow faking a readonly data lock for
1166          * rodevices. */
1167         if (ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb))) {
1168                 if (write) {
1169                         status = -EROFS;
1170                         mlog_errno(status);
1171                 }
1172                 goto out;
1173         }
1174
1175         lockres = &OCFS2_I(inode)->ip_data_lockres;
1176
1177         level = write ? LKM_EXMODE : LKM_PRMODE;
1178
1179         status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level,
1180                                     0, arg_flags);
1181         if (status < 0 && status != -EAGAIN)
1182                 mlog_errno(status);
1183
1184 out:
1185         mlog_exit(status);
1186         return status;
1187 }
1188
1189 /* see ocfs2_meta_lock_with_page() */
1190 int ocfs2_data_lock_with_page(struct inode *inode,
1191                               int write,
1192                               struct page *page)
1193 {
1194         int ret;
1195
1196         ret = ocfs2_data_lock_full(inode, write, OCFS2_LOCK_NONBLOCK);
1197         if (ret == -EAGAIN) {
1198                 unlock_page(page);
1199                 if (ocfs2_data_lock(inode, write) == 0)
1200                         ocfs2_data_unlock(inode, write);
1201                 ret = AOP_TRUNCATED_PAGE;
1202         }
1203
1204         return ret;
1205 }
1206
1207 static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
1208                                  struct ocfs2_lock_res *lockres)
1209 {
1210         int kick = 0;
1211
1212         mlog_entry_void();
1213
1214         /* If we know that another node is waiting on our lock, kick
1215          * the vote thread * pre-emptively when we reach a release
1216          * condition. */
1217         if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
1218                 switch(lockres->l_blocking) {
1219                 case LKM_EXMODE:
1220                         if (!lockres->l_ex_holders && !lockres->l_ro_holders)
1221                                 kick = 1;
1222                         break;
1223                 case LKM_PRMODE:
1224                         if (!lockres->l_ex_holders)
1225                                 kick = 1;
1226                         break;
1227                 default:
1228                         BUG();
1229                 }
1230         }
1231
1232         if (kick)
1233                 ocfs2_kick_vote_thread(osb);
1234
1235         mlog_exit_void();
1236 }
1237
1238 void ocfs2_data_unlock(struct inode *inode,
1239                        int write)
1240 {
1241         int level = write ? LKM_EXMODE : LKM_PRMODE;
1242         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_data_lockres;
1243
1244         mlog_entry_void();
1245
1246         mlog(0, "inode %llu drop %s DATA lock\n",
1247              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1248              write ? "EXMODE" : "PRMODE");
1249
1250         if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)))
1251                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1252
1253         mlog_exit_void();
1254 }
1255
1256 #define OCFS2_SEC_BITS   34
1257 #define OCFS2_SEC_SHIFT  (64 - 34)
1258 #define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
1259
1260 /* LVB only has room for 64 bits of time here so we pack it for
1261  * now. */
1262 static u64 ocfs2_pack_timespec(struct timespec *spec)
1263 {
1264         u64 res;
1265         u64 sec = spec->tv_sec;
1266         u32 nsec = spec->tv_nsec;
1267
1268         res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
1269
1270         return res;
1271 }
1272
1273 /* Call this with the lockres locked. I am reasonably sure we don't
1274  * need ip_lock in this function as anyone who would be changing those
1275  * values is supposed to be blocked in ocfs2_meta_lock right now. */
1276 static void __ocfs2_stuff_meta_lvb(struct inode *inode)
1277 {
1278         struct ocfs2_inode_info *oi = OCFS2_I(inode);
1279         struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1280         struct ocfs2_meta_lvb *lvb;
1281
1282         mlog_entry_void();
1283
1284         lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1285
1286         /*
1287          * Invalidate the LVB of a deleted inode - this way other
1288          * nodes are forced to go to disk and discover the new inode
1289          * status.
1290          */
1291         if (oi->ip_flags & OCFS2_INODE_DELETED) {
1292                 lvb->lvb_version = 0;
1293                 goto out;
1294         }
1295
1296         lvb->lvb_version   = OCFS2_LVB_VERSION;
1297         lvb->lvb_isize     = cpu_to_be64(i_size_read(inode));
1298         lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
1299         lvb->lvb_iuid      = cpu_to_be32(inode->i_uid);
1300         lvb->lvb_igid      = cpu_to_be32(inode->i_gid);
1301         lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
1302         lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
1303         lvb->lvb_iatime_packed  =
1304                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
1305         lvb->lvb_ictime_packed =
1306                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
1307         lvb->lvb_imtime_packed =
1308                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
1309         lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
1310         lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
1311
1312 out:
1313         mlog_meta_lvb(0, lockres);
1314
1315         mlog_exit_void();
1316 }
1317
1318 static void ocfs2_unpack_timespec(struct timespec *spec,
1319                                   u64 packed_time)
1320 {
1321         spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
1322         spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
1323 }
1324
1325 static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
1326 {
1327         struct ocfs2_inode_info *oi = OCFS2_I(inode);
1328         struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1329         struct ocfs2_meta_lvb *lvb;
1330
1331         mlog_entry_void();
1332
1333         mlog_meta_lvb(0, lockres);
1334
1335         lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1336
1337         /* We're safe here without the lockres lock... */
1338         spin_lock(&oi->ip_lock);
1339         oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
1340         i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
1341
1342         oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
1343         ocfs2_set_inode_flags(inode);
1344
1345         /* fast-symlinks are a special case */
1346         if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
1347                 inode->i_blocks = 0;
1348         else
1349                 inode->i_blocks =
1350                         ocfs2_align_bytes_to_sectors(i_size_read(inode));
1351
1352         inode->i_uid     = be32_to_cpu(lvb->lvb_iuid);
1353         inode->i_gid     = be32_to_cpu(lvb->lvb_igid);
1354         inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
1355         inode->i_nlink   = be16_to_cpu(lvb->lvb_inlink);
1356         ocfs2_unpack_timespec(&inode->i_atime,
1357                               be64_to_cpu(lvb->lvb_iatime_packed));
1358         ocfs2_unpack_timespec(&inode->i_mtime,
1359                               be64_to_cpu(lvb->lvb_imtime_packed));
1360         ocfs2_unpack_timespec(&inode->i_ctime,
1361                               be64_to_cpu(lvb->lvb_ictime_packed));
1362         spin_unlock(&oi->ip_lock);
1363
1364         mlog_exit_void();
1365 }
1366
1367 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
1368                                               struct ocfs2_lock_res *lockres)
1369 {
1370         struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1371
1372         if (lvb->lvb_version == OCFS2_LVB_VERSION
1373             && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
1374                 return 1;
1375         return 0;
1376 }
1377
1378 /* Determine whether a lock resource needs to be refreshed, and
1379  * arbitrate who gets to refresh it.
1380  *
1381  *   0 means no refresh needed.
1382  *
1383  *   > 0 means you need to refresh this and you MUST call
1384  *   ocfs2_complete_lock_res_refresh afterwards. */
1385 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
1386 {
1387         unsigned long flags;
1388         int status = 0;
1389
1390         mlog_entry_void();
1391
1392 refresh_check:
1393         spin_lock_irqsave(&lockres->l_lock, flags);
1394         if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
1395                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1396                 goto bail;
1397         }
1398
1399         if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
1400                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1401
1402                 ocfs2_wait_on_refreshing_lock(lockres);
1403                 goto refresh_check;
1404         }
1405
1406         /* Ok, I'll be the one to refresh this lock. */
1407         lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
1408         spin_unlock_irqrestore(&lockres->l_lock, flags);
1409
1410         status = 1;
1411 bail:
1412         mlog_exit(status);
1413         return status;
1414 }
1415
1416 /* If status is non zero, I'll mark it as not being in refresh
1417  * anymroe, but i won't clear the needs refresh flag. */
1418 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
1419                                                    int status)
1420 {
1421         unsigned long flags;
1422         mlog_entry_void();
1423
1424         spin_lock_irqsave(&lockres->l_lock, flags);
1425         lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
1426         if (!status)
1427                 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
1428         spin_unlock_irqrestore(&lockres->l_lock, flags);
1429
1430         wake_up(&lockres->l_event);
1431
1432         mlog_exit_void();
1433 }
1434
1435 /* may or may not return a bh if it went to disk. */
1436 static int ocfs2_meta_lock_update(struct inode *inode,
1437                                   struct buffer_head **bh)
1438 {
1439         int status = 0;
1440         struct ocfs2_inode_info *oi = OCFS2_I(inode);
1441         struct ocfs2_lock_res *lockres;
1442         struct ocfs2_dinode *fe;
1443
1444         mlog_entry_void();
1445
1446         spin_lock(&oi->ip_lock);
1447         if (oi->ip_flags & OCFS2_INODE_DELETED) {
1448                 mlog(0, "Orphaned inode %llu was deleted while we "
1449                      "were waiting on a lock. ip_flags = 0x%x\n",
1450                      (unsigned long long)oi->ip_blkno, oi->ip_flags);
1451                 spin_unlock(&oi->ip_lock);
1452                 status = -ENOENT;
1453                 goto bail;
1454         }
1455         spin_unlock(&oi->ip_lock);
1456
1457         lockres = &oi->ip_meta_lockres;
1458
1459         if (!ocfs2_should_refresh_lock_res(lockres))
1460                 goto bail;
1461
1462         /* This will discard any caching information we might have had
1463          * for the inode metadata. */
1464         ocfs2_metadata_cache_purge(inode);
1465
1466         /* will do nothing for inode types that don't use the extent
1467          * map (directories, bitmap files, etc) */
1468         ocfs2_extent_map_trunc(inode, 0);
1469
1470         if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
1471                 mlog(0, "Trusting LVB on inode %llu\n",
1472                      (unsigned long long)oi->ip_blkno);
1473                 ocfs2_refresh_inode_from_lvb(inode);
1474         } else {
1475                 /* Boo, we have to go to disk. */
1476                 /* read bh, cast, ocfs2_refresh_inode */
1477                 status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno,
1478                                           bh, OCFS2_BH_CACHED, inode);
1479                 if (status < 0) {
1480                         mlog_errno(status);
1481                         goto bail_refresh;
1482                 }
1483                 fe = (struct ocfs2_dinode *) (*bh)->b_data;
1484
1485                 /* This is a good chance to make sure we're not
1486                  * locking an invalid object.
1487                  *
1488                  * We bug on a stale inode here because we checked
1489                  * above whether it was wiped from disk. The wiping
1490                  * node provides a guarantee that we receive that
1491                  * message and can mark the inode before dropping any
1492                  * locks associated with it. */
1493                 if (!OCFS2_IS_VALID_DINODE(fe)) {
1494                         OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe);
1495                         status = -EIO;
1496                         goto bail_refresh;
1497                 }
1498                 mlog_bug_on_msg(inode->i_generation !=
1499                                 le32_to_cpu(fe->i_generation),
1500                                 "Invalid dinode %llu disk generation: %u "
1501                                 "inode->i_generation: %u\n",
1502                                 (unsigned long long)oi->ip_blkno,
1503                                 le32_to_cpu(fe->i_generation),
1504                                 inode->i_generation);
1505                 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
1506                                 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
1507                                 "Stale dinode %llu dtime: %llu flags: 0x%x\n",
1508                                 (unsigned long long)oi->ip_blkno,
1509                                 (unsigned long long)le64_to_cpu(fe->i_dtime),
1510                                 le32_to_cpu(fe->i_flags));
1511
1512                 ocfs2_refresh_inode(inode, fe);
1513         }
1514
1515         status = 0;
1516 bail_refresh:
1517         ocfs2_complete_lock_res_refresh(lockres, status);
1518 bail:
1519         mlog_exit(status);
1520         return status;
1521 }
1522
1523 static int ocfs2_assign_bh(struct inode *inode,
1524                            struct buffer_head **ret_bh,
1525                            struct buffer_head *passed_bh)
1526 {
1527         int status;
1528
1529         if (passed_bh) {
1530                 /* Ok, the update went to disk for us, use the
1531                  * returned bh. */
1532                 *ret_bh = passed_bh;
1533                 get_bh(*ret_bh);
1534
1535                 return 0;
1536         }
1537
1538         status = ocfs2_read_block(OCFS2_SB(inode->i_sb),
1539                                   OCFS2_I(inode)->ip_blkno,
1540                                   ret_bh,
1541                                   OCFS2_BH_CACHED,
1542                                   inode);
1543         if (status < 0)
1544                 mlog_errno(status);
1545
1546         return status;
1547 }
1548
1549 /*
1550  * returns < 0 error if the callback will never be called, otherwise
1551  * the result of the lock will be communicated via the callback.
1552  */
1553 int ocfs2_meta_lock_full(struct inode *inode,
1554                          struct ocfs2_journal_handle *handle,
1555                          struct buffer_head **ret_bh,
1556                          int ex,
1557                          int arg_flags)
1558 {
1559         int status, level, dlm_flags, acquired;
1560         struct ocfs2_lock_res *lockres;
1561         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1562         struct buffer_head *local_bh = NULL;
1563
1564         BUG_ON(!inode);
1565
1566         mlog_entry_void();
1567
1568         mlog(0, "inode %llu, take %s META lock\n",
1569              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1570              ex ? "EXMODE" : "PRMODE");
1571
1572         status = 0;
1573         acquired = 0;
1574         /* We'll allow faking a readonly metadata lock for
1575          * rodevices. */
1576         if (ocfs2_is_hard_readonly(osb)) {
1577                 if (ex)
1578                         status = -EROFS;
1579                 goto bail;
1580         }
1581
1582         if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1583                 wait_event(osb->recovery_event,
1584                            ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1585
1586         acquired = 0;
1587         lockres = &OCFS2_I(inode)->ip_meta_lockres;
1588         level = ex ? LKM_EXMODE : LKM_PRMODE;
1589         dlm_flags = 0;
1590         if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
1591                 dlm_flags |= LKM_NOQUEUE;
1592
1593         status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
1594         if (status < 0) {
1595                 if (status != -EAGAIN && status != -EIOCBRETRY)
1596                         mlog_errno(status);
1597                 goto bail;
1598         }
1599
1600         /* Notify the error cleanup path to drop the cluster lock. */
1601         acquired = 1;
1602
1603         /* We wait twice because a node may have died while we were in
1604          * the lower dlm layers. The second time though, we've
1605          * committed to owning this lock so we don't allow signals to
1606          * abort the operation. */
1607         if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1608                 wait_event(osb->recovery_event,
1609                            ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1610
1611         /*
1612          * We only see this flag if we're being called from
1613          * ocfs2_read_locked_inode(). It means we're locking an inode
1614          * which hasn't been populated yet, so clear the refresh flag
1615          * and let the caller handle it.
1616          */
1617         if (inode->i_state & I_NEW) {
1618                 status = 0;
1619                 ocfs2_complete_lock_res_refresh(lockres, 0);
1620                 goto bail;
1621         }
1622
1623         /* This is fun. The caller may want a bh back, or it may
1624          * not. ocfs2_meta_lock_update definitely wants one in, but
1625          * may or may not read one, depending on what's in the
1626          * LVB. The result of all of this is that we've *only* gone to
1627          * disk if we have to, so the complexity is worthwhile. */
1628         status = ocfs2_meta_lock_update(inode, &local_bh);
1629         if (status < 0) {
1630                 if (status != -ENOENT)
1631                         mlog_errno(status);
1632                 goto bail;
1633         }
1634
1635         if (ret_bh) {
1636                 status = ocfs2_assign_bh(inode, ret_bh, local_bh);
1637                 if (status < 0) {
1638                         mlog_errno(status);
1639                         goto bail;
1640                 }
1641         }
1642
1643         if (handle) {
1644                 status = ocfs2_handle_add_lock(handle, inode);
1645                 if (status < 0)
1646                         mlog_errno(status);
1647         }
1648
1649 bail:
1650         if (status < 0) {
1651                 if (ret_bh && (*ret_bh)) {
1652                         brelse(*ret_bh);
1653                         *ret_bh = NULL;
1654                 }
1655                 if (acquired)
1656                         ocfs2_meta_unlock(inode, ex);
1657         }
1658
1659         if (local_bh)
1660                 brelse(local_bh);
1661
1662         mlog_exit(status);
1663         return status;
1664 }
1665
1666 /*
1667  * This is working around a lock inversion between tasks acquiring DLM locks
1668  * while holding a page lock and the vote thread which blocks dlm lock acquiry
1669  * while acquiring page locks.
1670  *
1671  * ** These _with_page variantes are only intended to be called from aop
1672  * methods that hold page locks and return a very specific *positive* error
1673  * code that aop methods pass up to the VFS -- test for errors with != 0. **
1674  *
1675  * The DLM is called such that it returns -EAGAIN if it would have blocked
1676  * waiting for the vote thread.  In that case we unlock our page so the vote
1677  * thread can make progress.  Once we've done this we have to return
1678  * AOP_TRUNCATED_PAGE so the aop method that called us can bubble that back up
1679  * into the VFS who will then immediately retry the aop call.
1680  *
1681  * We do a blocking lock and immediate unlock before returning, though, so that
1682  * the lock has a great chance of being cached on this node by the time the VFS
1683  * calls back to retry the aop.    This has a potential to livelock as nodes
1684  * ping locks back and forth, but that's a risk we're willing to take to avoid
1685  * the lock inversion simply.
1686  */
1687 int ocfs2_meta_lock_with_page(struct inode *inode,
1688                               struct ocfs2_journal_handle *handle,
1689                               struct buffer_head **ret_bh,
1690                               int ex,
1691                               struct page *page)
1692 {
1693         int ret;
1694
1695         ret = ocfs2_meta_lock_full(inode, handle, ret_bh, ex,
1696                                    OCFS2_LOCK_NONBLOCK);
1697         if (ret == -EAGAIN) {
1698                 unlock_page(page);
1699                 if (ocfs2_meta_lock(inode, handle, ret_bh, ex) == 0)
1700                         ocfs2_meta_unlock(inode, ex);
1701                 ret = AOP_TRUNCATED_PAGE;
1702         }
1703
1704         return ret;
1705 }
1706
1707 void ocfs2_meta_unlock(struct inode *inode,
1708                        int ex)
1709 {
1710         int level = ex ? LKM_EXMODE : LKM_PRMODE;
1711         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
1712
1713         mlog_entry_void();
1714
1715         mlog(0, "inode %llu drop %s META lock\n",
1716              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1717              ex ? "EXMODE" : "PRMODE");
1718
1719         if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)))
1720                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1721
1722         mlog_exit_void();
1723 }
1724
1725 int ocfs2_super_lock(struct ocfs2_super *osb,
1726                      int ex)
1727 {
1728         int status;
1729         int level = ex ? LKM_EXMODE : LKM_PRMODE;
1730         struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1731         struct buffer_head *bh;
1732         struct ocfs2_slot_info *si = osb->slot_info;
1733
1734         mlog_entry_void();
1735
1736         if (ocfs2_is_hard_readonly(osb))
1737                 return -EROFS;
1738
1739         status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
1740         if (status < 0) {
1741                 mlog_errno(status);
1742                 goto bail;
1743         }
1744
1745         /* The super block lock path is really in the best position to
1746          * know when resources covered by the lock need to be
1747          * refreshed, so we do it here. Of course, making sense of
1748          * everything is up to the caller :) */
1749         status = ocfs2_should_refresh_lock_res(lockres);
1750         if (status < 0) {
1751                 mlog_errno(status);
1752                 goto bail;
1753         }
1754         if (status) {
1755                 bh = si->si_bh;
1756                 status = ocfs2_read_block(osb, bh->b_blocknr, &bh, 0,
1757                                           si->si_inode);
1758                 if (status == 0)
1759                         ocfs2_update_slot_info(si);
1760
1761                 ocfs2_complete_lock_res_refresh(lockres, status);
1762
1763                 if (status < 0)
1764                         mlog_errno(status);
1765         }
1766 bail:
1767         mlog_exit(status);
1768         return status;
1769 }
1770
1771 void ocfs2_super_unlock(struct ocfs2_super *osb,
1772                         int ex)
1773 {
1774         int level = ex ? LKM_EXMODE : LKM_PRMODE;
1775         struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1776
1777         ocfs2_cluster_unlock(osb, lockres, level);
1778 }
1779
1780 int ocfs2_rename_lock(struct ocfs2_super *osb)
1781 {
1782         int status;
1783         struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1784
1785         if (ocfs2_is_hard_readonly(osb))
1786                 return -EROFS;
1787
1788         status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0);
1789         if (status < 0)
1790                 mlog_errno(status);
1791
1792         return status;
1793 }
1794
1795 void ocfs2_rename_unlock(struct ocfs2_super *osb)
1796 {
1797         struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1798
1799         ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
1800 }
1801
1802 int ocfs2_dentry_lock(struct dentry *dentry, int ex)
1803 {
1804         int ret;
1805         int level = ex ? LKM_EXMODE : LKM_PRMODE;
1806         struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
1807         struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
1808
1809         BUG_ON(!dl);
1810
1811         if (ocfs2_is_hard_readonly(osb))
1812                 return -EROFS;
1813
1814         ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
1815         if (ret < 0)
1816                 mlog_errno(ret);
1817
1818         return ret;
1819 }
1820
1821 void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
1822 {
1823         int level = ex ? LKM_EXMODE : LKM_PRMODE;
1824         struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
1825         struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
1826
1827         ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
1828 }
1829
1830 /* Reference counting of the dlm debug structure. We want this because
1831  * open references on the debug inodes can live on after a mount, so
1832  * we can't rely on the ocfs2_super to always exist. */
1833 static void ocfs2_dlm_debug_free(struct kref *kref)
1834 {
1835         struct ocfs2_dlm_debug *dlm_debug;
1836
1837         dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
1838
1839         kfree(dlm_debug);
1840 }
1841
1842 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
1843 {
1844         if (dlm_debug)
1845                 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
1846 }
1847
1848 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
1849 {
1850         kref_get(&debug->d_refcnt);
1851 }
1852
1853 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
1854 {
1855         struct ocfs2_dlm_debug *dlm_debug;
1856
1857         dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
1858         if (!dlm_debug) {
1859                 mlog_errno(-ENOMEM);
1860                 goto out;
1861         }
1862
1863         kref_init(&dlm_debug->d_refcnt);
1864         INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
1865         dlm_debug->d_locking_state = NULL;
1866 out:
1867         return dlm_debug;
1868 }
1869
1870 /* Access to this is arbitrated for us via seq_file->sem. */
1871 struct ocfs2_dlm_seq_priv {
1872         struct ocfs2_dlm_debug *p_dlm_debug;
1873         struct ocfs2_lock_res p_iter_res;
1874         struct ocfs2_lock_res p_tmp_res;
1875 };
1876
1877 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
1878                                                  struct ocfs2_dlm_seq_priv *priv)
1879 {
1880         struct ocfs2_lock_res *iter, *ret = NULL;
1881         struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
1882
1883         assert_spin_locked(&ocfs2_dlm_tracking_lock);
1884
1885         list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
1886                 /* discover the head of the list */
1887                 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
1888                         mlog(0, "End of list found, %p\n", ret);
1889                         break;
1890                 }
1891
1892                 /* We track our "dummy" iteration lockres' by a NULL
1893                  * l_ops field. */
1894                 if (iter->l_ops != NULL) {
1895                         ret = iter;
1896                         break;
1897                 }
1898         }
1899
1900         return ret;
1901 }
1902
1903 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
1904 {
1905         struct ocfs2_dlm_seq_priv *priv = m->private;
1906         struct ocfs2_lock_res *iter;
1907
1908         spin_lock(&ocfs2_dlm_tracking_lock);
1909         iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
1910         if (iter) {
1911                 /* Since lockres' have the lifetime of their container
1912                  * (which can be inodes, ocfs2_supers, etc) we want to
1913                  * copy this out to a temporary lockres while still
1914                  * under the spinlock. Obviously after this we can't
1915                  * trust any pointers on the copy returned, but that's
1916                  * ok as the information we want isn't typically held
1917                  * in them. */
1918                 priv->p_tmp_res = *iter;
1919                 iter = &priv->p_tmp_res;
1920         }
1921         spin_unlock(&ocfs2_dlm_tracking_lock);
1922
1923         return iter;
1924 }
1925
1926 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
1927 {
1928 }
1929
1930 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
1931 {
1932         struct ocfs2_dlm_seq_priv *priv = m->private;
1933         struct ocfs2_lock_res *iter = v;
1934         struct ocfs2_lock_res *dummy = &priv->p_iter_res;
1935
1936         spin_lock(&ocfs2_dlm_tracking_lock);
1937         iter = ocfs2_dlm_next_res(iter, priv);
1938         list_del_init(&dummy->l_debug_list);
1939         if (iter) {
1940                 list_add(&dummy->l_debug_list, &iter->l_debug_list);
1941                 priv->p_tmp_res = *iter;
1942                 iter = &priv->p_tmp_res;
1943         }
1944         spin_unlock(&ocfs2_dlm_tracking_lock);
1945
1946         return iter;
1947 }
1948
1949 /* So that debugfs.ocfs2 can determine which format is being used */
1950 #define OCFS2_DLM_DEBUG_STR_VERSION 1
1951 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
1952 {
1953         int i;
1954         char *lvb;
1955         struct ocfs2_lock_res *lockres = v;
1956
1957         if (!lockres)
1958                 return -EINVAL;
1959
1960         seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
1961
1962         if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
1963                 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
1964                            lockres->l_name,
1965                            (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
1966         else
1967                 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
1968
1969         seq_printf(m, "%d\t"
1970                    "0x%lx\t"
1971                    "0x%x\t"
1972                    "0x%x\t"
1973                    "%u\t"
1974                    "%u\t"
1975                    "%d\t"
1976                    "%d\t",
1977                    lockres->l_level,
1978                    lockres->l_flags,
1979                    lockres->l_action,
1980                    lockres->l_unlock_action,
1981                    lockres->l_ro_holders,
1982                    lockres->l_ex_holders,
1983                    lockres->l_requested,
1984                    lockres->l_blocking);
1985
1986         /* Dump the raw LVB */
1987         lvb = lockres->l_lksb.lvb;
1988         for(i = 0; i < DLM_LVB_LEN; i++)
1989                 seq_printf(m, "0x%x\t", lvb[i]);
1990
1991         /* End the line */
1992         seq_printf(m, "\n");
1993         return 0;
1994 }
1995
1996 static struct seq_operations ocfs2_dlm_seq_ops = {
1997         .start =        ocfs2_dlm_seq_start,
1998         .stop =         ocfs2_dlm_seq_stop,
1999         .next =         ocfs2_dlm_seq_next,
2000         .show =         ocfs2_dlm_seq_show,
2001 };
2002
2003 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
2004 {
2005         struct seq_file *seq = (struct seq_file *) file->private_data;
2006         struct ocfs2_dlm_seq_priv *priv = seq->private;
2007         struct ocfs2_lock_res *res = &priv->p_iter_res;
2008
2009         ocfs2_remove_lockres_tracking(res);
2010         ocfs2_put_dlm_debug(priv->p_dlm_debug);
2011         return seq_release_private(inode, file);
2012 }
2013
2014 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
2015 {
2016         int ret;
2017         struct ocfs2_dlm_seq_priv *priv;
2018         struct seq_file *seq;
2019         struct ocfs2_super *osb;
2020
2021         priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL);
2022         if (!priv) {
2023                 ret = -ENOMEM;
2024                 mlog_errno(ret);
2025                 goto out;
2026         }
2027         osb = (struct ocfs2_super *) inode->u.generic_ip;
2028         ocfs2_get_dlm_debug(osb->osb_dlm_debug);
2029         priv->p_dlm_debug = osb->osb_dlm_debug;
2030         INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
2031
2032         ret = seq_open(file, &ocfs2_dlm_seq_ops);
2033         if (ret) {
2034                 kfree(priv);
2035                 mlog_errno(ret);
2036                 goto out;
2037         }
2038
2039         seq = (struct seq_file *) file->private_data;
2040         seq->private = priv;
2041
2042         ocfs2_add_lockres_tracking(&priv->p_iter_res,
2043                                    priv->p_dlm_debug);
2044
2045 out:
2046         return ret;
2047 }
2048
2049 static const struct file_operations ocfs2_dlm_debug_fops = {
2050         .open =         ocfs2_dlm_debug_open,
2051         .release =      ocfs2_dlm_debug_release,
2052         .read =         seq_read,
2053         .llseek =       seq_lseek,
2054 };
2055
2056 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
2057 {
2058         int ret = 0;
2059         struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2060
2061         dlm_debug->d_locking_state = debugfs_create_file("locking_state",
2062                                                          S_IFREG|S_IRUSR,
2063                                                          osb->osb_debug_root,
2064                                                          osb,
2065                                                          &ocfs2_dlm_debug_fops);
2066         if (!dlm_debug->d_locking_state) {
2067                 ret = -EINVAL;
2068                 mlog(ML_ERROR,
2069                      "Unable to create locking state debugfs file.\n");
2070                 goto out;
2071         }
2072
2073         ocfs2_get_dlm_debug(dlm_debug);
2074 out:
2075         return ret;
2076 }
2077
2078 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
2079 {
2080         struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2081
2082         if (dlm_debug) {
2083                 debugfs_remove(dlm_debug->d_locking_state);
2084                 ocfs2_put_dlm_debug(dlm_debug);
2085         }
2086 }
2087
2088 int ocfs2_dlm_init(struct ocfs2_super *osb)
2089 {
2090         int status;
2091         u32 dlm_key;
2092         struct dlm_ctxt *dlm;
2093
2094         mlog_entry_void();
2095
2096         status = ocfs2_dlm_init_debug(osb);
2097         if (status < 0) {
2098                 mlog_errno(status);
2099                 goto bail;
2100         }
2101
2102         /* launch vote thread */
2103         osb->vote_task = kthread_run(ocfs2_vote_thread, osb, "ocfs2vote");
2104         if (IS_ERR(osb->vote_task)) {
2105                 status = PTR_ERR(osb->vote_task);
2106                 osb->vote_task = NULL;
2107                 mlog_errno(status);
2108                 goto bail;
2109         }
2110
2111         /* used by the dlm code to make message headers unique, each
2112          * node in this domain must agree on this. */
2113         dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str));
2114
2115         /* for now, uuid == domain */
2116         dlm = dlm_register_domain(osb->uuid_str, dlm_key);
2117         if (IS_ERR(dlm)) {
2118                 status = PTR_ERR(dlm);
2119                 mlog_errno(status);
2120                 goto bail;
2121         }
2122
2123         ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
2124         ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
2125
2126         dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
2127
2128         osb->dlm = dlm;
2129
2130         status = 0;
2131 bail:
2132         if (status < 0) {
2133                 ocfs2_dlm_shutdown_debug(osb);
2134                 if (osb->vote_task)
2135                         kthread_stop(osb->vote_task);
2136         }
2137
2138         mlog_exit(status);
2139         return status;
2140 }
2141
2142 void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
2143 {
2144         mlog_entry_void();
2145
2146         dlm_unregister_eviction_cb(&osb->osb_eviction_cb);
2147
2148         ocfs2_drop_osb_locks(osb);
2149
2150         if (osb->vote_task) {
2151                 kthread_stop(osb->vote_task);
2152                 osb->vote_task = NULL;
2153         }
2154
2155         ocfs2_lock_res_free(&osb->osb_super_lockres);
2156         ocfs2_lock_res_free(&osb->osb_rename_lockres);
2157
2158         dlm_unregister_domain(osb->dlm);
2159         osb->dlm = NULL;
2160
2161         ocfs2_dlm_shutdown_debug(osb);
2162
2163         mlog_exit_void();
2164 }
2165
2166 static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
2167 {
2168         struct ocfs2_lock_res *lockres = opaque;
2169         unsigned long flags;
2170
2171         mlog_entry_void();
2172
2173         mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
2174              lockres->l_unlock_action);
2175
2176         spin_lock_irqsave(&lockres->l_lock, flags);
2177         /* We tried to cancel a convert request, but it was already
2178          * granted. All we want to do here is clear our unlock
2179          * state. The wake_up call done at the bottom is redundant
2180          * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
2181          * hurt anything anyway */
2182         if (status == DLM_CANCELGRANT &&
2183             lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2184                 mlog(0, "Got cancelgrant for %s\n", lockres->l_name);
2185
2186                 /* We don't clear the busy flag in this case as it
2187                  * should have been cleared by the ast which the dlm
2188                  * has called. */
2189                 goto complete_unlock;
2190         }
2191
2192         if (status != DLM_NORMAL) {
2193                 mlog(ML_ERROR, "Dlm passes status %d for lock %s, "
2194                      "unlock_action %d\n", status, lockres->l_name,
2195                      lockres->l_unlock_action);
2196                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2197                 return;
2198         }
2199
2200         switch(lockres->l_unlock_action) {
2201         case OCFS2_UNLOCK_CANCEL_CONVERT:
2202                 mlog(0, "Cancel convert success for %s\n", lockres->l_name);
2203                 lockres->l_action = OCFS2_AST_INVALID;
2204                 break;
2205         case OCFS2_UNLOCK_DROP_LOCK:
2206                 lockres->l_level = LKM_IVMODE;
2207                 break;
2208         default:
2209                 BUG();
2210         }
2211
2212         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
2213 complete_unlock:
2214         lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
2215         spin_unlock_irqrestore(&lockres->l_lock, flags);
2216
2217         wake_up(&lockres->l_event);
2218
2219         mlog_exit_void();
2220 }
2221
2222 typedef void (ocfs2_pre_drop_cb_t)(struct ocfs2_lock_res *, void *);
2223
2224 struct drop_lock_cb {
2225         ocfs2_pre_drop_cb_t     *drop_func;
2226         void                    *drop_data;
2227 };
2228
2229 static int ocfs2_drop_lock(struct ocfs2_super *osb,
2230                            struct ocfs2_lock_res *lockres,
2231                            struct drop_lock_cb *dcb)
2232 {
2233         enum dlm_status status;
2234         unsigned long flags;
2235         int lkm_flags = 0;
2236
2237         /* We didn't get anywhere near actually using this lockres. */
2238         if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
2239                 goto out;
2240
2241         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
2242                 lkm_flags |= LKM_VALBLK;
2243
2244         spin_lock_irqsave(&lockres->l_lock, flags);
2245
2246         mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
2247                         "lockres %s, flags 0x%lx\n",
2248                         lockres->l_name, lockres->l_flags);
2249
2250         while (lockres->l_flags & OCFS2_LOCK_BUSY) {
2251                 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
2252                      "%u, unlock_action = %u\n",
2253                      lockres->l_name, lockres->l_flags, lockres->l_action,
2254                      lockres->l_unlock_action);
2255
2256                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2257
2258                 /* XXX: Today we just wait on any busy
2259                  * locks... Perhaps we need to cancel converts in the
2260                  * future? */
2261                 ocfs2_wait_on_busy_lock(lockres);
2262
2263                 spin_lock_irqsave(&lockres->l_lock, flags);
2264         }
2265
2266         if (dcb)
2267                 dcb->drop_func(lockres, dcb->drop_data);
2268
2269         if (lockres->l_flags & OCFS2_LOCK_BUSY)
2270                 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
2271                      lockres->l_name);
2272         if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
2273                 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
2274
2275         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
2276                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2277                 goto out;
2278         }
2279
2280         lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
2281
2282         /* make sure we never get here while waiting for an ast to
2283          * fire. */
2284         BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
2285
2286         /* is this necessary? */
2287         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2288         lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
2289         spin_unlock_irqrestore(&lockres->l_lock, flags);
2290
2291         mlog(0, "lock %s\n", lockres->l_name);
2292
2293         status = dlmunlock(osb->dlm, &lockres->l_lksb, lkm_flags,
2294                            ocfs2_unlock_ast, lockres);
2295         if (status != DLM_NORMAL) {
2296                 ocfs2_log_dlm_error("dlmunlock", status, lockres);
2297                 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
2298                 dlm_print_one_lock(lockres->l_lksb.lockid);
2299                 BUG();
2300         }
2301         mlog(0, "lock %s, successfull return from dlmunlock\n",
2302              lockres->l_name);
2303
2304         ocfs2_wait_on_busy_lock(lockres);
2305 out:
2306         mlog_exit(0);
2307         return 0;
2308 }
2309
2310 /* Mark the lockres as being dropped. It will no longer be
2311  * queued if blocking, but we still may have to wait on it
2312  * being dequeued from the vote thread before we can consider
2313  * it safe to drop. 
2314  *
2315  * You can *not* attempt to call cluster_lock on this lockres anymore. */
2316 void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
2317 {
2318         int status;
2319         struct ocfs2_mask_waiter mw;
2320         unsigned long flags;
2321
2322         ocfs2_init_mask_waiter(&mw);
2323
2324         spin_lock_irqsave(&lockres->l_lock, flags);
2325         lockres->l_flags |= OCFS2_LOCK_FREEING;
2326         while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
2327                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
2328                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2329
2330                 mlog(0, "Waiting on lockres %s\n", lockres->l_name);
2331
2332                 status = ocfs2_wait_for_mask(&mw);
2333                 if (status)
2334                         mlog_errno(status);
2335
2336                 spin_lock_irqsave(&lockres->l_lock, flags);
2337         }
2338         spin_unlock_irqrestore(&lockres->l_lock, flags);
2339 }
2340
2341 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
2342                                struct ocfs2_lock_res *lockres)
2343 {
2344         int ret;
2345
2346         ocfs2_mark_lockres_freeing(lockres);
2347         ret = ocfs2_drop_lock(osb, lockres, NULL);
2348         if (ret)
2349                 mlog_errno(ret);
2350 }
2351
2352 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
2353 {
2354         ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
2355         ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
2356 }
2357
2358 static void ocfs2_meta_pre_drop(struct ocfs2_lock_res *lockres, void *data)
2359 {
2360         struct inode *inode = data;
2361
2362         /* the metadata lock requires a bit more work as we have an
2363          * LVB to worry about. */
2364         if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
2365             lockres->l_level == LKM_EXMODE &&
2366             !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2367                 __ocfs2_stuff_meta_lvb(inode);
2368 }
2369
2370 int ocfs2_drop_inode_locks(struct inode *inode)
2371 {
2372         int status, err;
2373         struct drop_lock_cb meta_dcb = { ocfs2_meta_pre_drop, inode, };
2374
2375         mlog_entry_void();
2376
2377         /* No need to call ocfs2_mark_lockres_freeing here -
2378          * ocfs2_clear_inode has done it for us. */
2379
2380         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2381                               &OCFS2_I(inode)->ip_data_lockres,
2382                               NULL);
2383         if (err < 0)
2384                 mlog_errno(err);
2385
2386         status = err;
2387
2388         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2389                               &OCFS2_I(inode)->ip_meta_lockres,
2390                               &meta_dcb);
2391         if (err < 0)
2392                 mlog_errno(err);
2393         if (err < 0 && !status)
2394                 status = err;
2395
2396         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2397                               &OCFS2_I(inode)->ip_rw_lockres,
2398                               NULL);
2399         if (err < 0)
2400                 mlog_errno(err);
2401         if (err < 0 && !status)
2402                 status = err;
2403
2404         mlog_exit(status);
2405         return status;
2406 }
2407
2408 static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
2409                                       int new_level)
2410 {
2411         assert_spin_locked(&lockres->l_lock);
2412
2413         BUG_ON(lockres->l_blocking <= LKM_NLMODE);
2414
2415         if (lockres->l_level <= new_level) {
2416                 mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n",
2417                      lockres->l_level, new_level);
2418                 BUG();
2419         }
2420
2421         mlog(0, "lock %s, new_level = %d, l_blocking = %d\n",
2422              lockres->l_name, new_level, lockres->l_blocking);
2423
2424         lockres->l_action = OCFS2_AST_DOWNCONVERT;
2425         lockres->l_requested = new_level;
2426         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2427 }
2428
2429 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
2430                                   struct ocfs2_lock_res *lockres,
2431                                   int new_level,
2432                                   int lvb)
2433 {
2434         int ret, dlm_flags = LKM_CONVERT;
2435         enum dlm_status status;
2436
2437         mlog_entry_void();
2438
2439         if (lvb)
2440                 dlm_flags |= LKM_VALBLK;
2441
2442         status = dlmlock(osb->dlm,
2443                          new_level,
2444                          &lockres->l_lksb,
2445                          dlm_flags,
2446                          lockres->l_name,
2447                          OCFS2_LOCK_ID_MAX_LEN - 1,
2448                          ocfs2_locking_ast,
2449                          lockres,
2450                          ocfs2_blocking_ast);
2451         if (status != DLM_NORMAL) {
2452                 ocfs2_log_dlm_error("dlmlock", status, lockres);
2453                 ret = -EINVAL;
2454                 ocfs2_recover_from_dlm_error(lockres, 1);
2455                 goto bail;
2456         }
2457
2458         ret = 0;
2459 bail:
2460         mlog_exit(ret);
2461         return ret;
2462 }
2463
2464 /* returns 1 when the caller should unlock and call dlmunlock */
2465 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
2466                                         struct ocfs2_lock_res *lockres)
2467 {
2468         assert_spin_locked(&lockres->l_lock);
2469
2470         mlog_entry_void();
2471         mlog(0, "lock %s\n", lockres->l_name);
2472
2473         if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2474                 /* If we're already trying to cancel a lock conversion
2475                  * then just drop the spinlock and allow the caller to
2476                  * requeue this lock. */
2477
2478                 mlog(0, "Lockres %s, skip convert\n", lockres->l_name);
2479                 return 0;
2480         }
2481
2482         /* were we in a convert when we got the bast fire? */
2483         BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
2484                lockres->l_action != OCFS2_AST_DOWNCONVERT);
2485         /* set things up for the unlockast to know to just
2486          * clear out the ast_action and unset busy, etc. */
2487         lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
2488
2489         mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
2490                         "lock %s, invalid flags: 0x%lx\n",
2491                         lockres->l_name, lockres->l_flags);
2492
2493         return 1;
2494 }
2495
2496 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
2497                                 struct ocfs2_lock_res *lockres)
2498 {
2499         int ret;
2500         enum dlm_status status;
2501
2502         mlog_entry_void();
2503         mlog(0, "lock %s\n", lockres->l_name);
2504
2505         ret = 0;
2506         status = dlmunlock(osb->dlm,
2507                            &lockres->l_lksb,
2508                            LKM_CANCEL,
2509                            ocfs2_unlock_ast,
2510                            lockres);
2511         if (status != DLM_NORMAL) {
2512                 ocfs2_log_dlm_error("dlmunlock", status, lockres);
2513                 ret = -EINVAL;
2514                 ocfs2_recover_from_dlm_error(lockres, 0);
2515         }
2516
2517         mlog(0, "lock %s return from dlmunlock\n", lockres->l_name);
2518
2519         mlog_exit(ret);
2520         return ret;
2521 }
2522
2523 static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode,
2524                                                   struct ocfs2_lock_res *lockres,
2525                                                   int new_level)
2526 {
2527         int ret;
2528
2529         mlog_entry_void();
2530
2531         BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE);
2532
2533         if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
2534                 ret = 0;
2535                 mlog(0, "lockres %s currently being refreshed -- backing "
2536                      "off!\n", lockres->l_name);
2537         } else if (new_level == LKM_PRMODE)
2538                 ret = !lockres->l_ex_holders &&
2539                         ocfs2_inode_fully_checkpointed(inode);
2540         else /* Must be NLMODE we're converting to. */
2541                 ret = !lockres->l_ro_holders && !lockres->l_ex_holders &&
2542                         ocfs2_inode_fully_checkpointed(inode);
2543
2544         mlog_exit(ret);
2545         return ret;
2546 }
2547
2548 static int ocfs2_do_unblock_meta(struct inode *inode,
2549                                  int *requeue)
2550 {
2551         int new_level;
2552         int set_lvb = 0;
2553         int ret = 0;
2554         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
2555         unsigned long flags;
2556
2557         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2558
2559         mlog_entry_void();
2560
2561         spin_lock_irqsave(&lockres->l_lock, flags);
2562
2563         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
2564
2565         mlog(0, "l_level=%d, l_blocking=%d\n", lockres->l_level,
2566              lockres->l_blocking);
2567
2568         BUG_ON(lockres->l_level != LKM_EXMODE &&
2569                lockres->l_level != LKM_PRMODE);
2570
2571         if (lockres->l_flags & OCFS2_LOCK_BUSY) {
2572                 *requeue = 1;
2573                 ret = ocfs2_prepare_cancel_convert(osb, lockres);
2574                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2575                 if (ret) {
2576                         ret = ocfs2_cancel_convert(osb, lockres);
2577                         if (ret < 0)
2578                                 mlog_errno(ret);
2579                 }
2580                 goto leave;
2581         }
2582
2583         new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
2584
2585         mlog(0, "l_level=%d, l_blocking=%d, new_level=%d\n",
2586              lockres->l_level, lockres->l_blocking, new_level);
2587
2588         if (ocfs2_can_downconvert_meta_lock(inode, lockres, new_level)) {
2589                 if (lockres->l_level == LKM_EXMODE)
2590                         set_lvb = 1;
2591
2592                 /* If the lock hasn't been refreshed yet (rare), then
2593                  * our memory inode values are old and we skip
2594                  * stuffing the lvb. There's no need to actually clear
2595                  * out the lvb here as it's value is still valid. */
2596                 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
2597                         if (set_lvb)
2598                                 __ocfs2_stuff_meta_lvb(inode);
2599                 } else
2600                         mlog(0, "lockres %s: downconverting stale lock!\n",
2601                              lockres->l_name);
2602
2603                 mlog(0, "calling ocfs2_downconvert_lock with l_level=%d, "
2604                      "l_blocking=%d, new_level=%d\n",
2605                      lockres->l_level, lockres->l_blocking, new_level);
2606
2607                 ocfs2_prepare_downconvert(lockres, new_level);
2608                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2609                 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
2610                 goto leave;
2611         }
2612         if (!ocfs2_inode_fully_checkpointed(inode))
2613                 ocfs2_start_checkpoint(osb);
2614
2615         *requeue = 1;
2616         spin_unlock_irqrestore(&lockres->l_lock, flags);
2617         ret = 0;
2618 leave:
2619         mlog_exit(ret);
2620         return ret;
2621 }
2622
2623 static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
2624                                       struct ocfs2_lock_res *lockres,
2625                                       struct ocfs2_unblock_ctl *ctl,
2626                                       ocfs2_convert_worker_t *worker)
2627 {
2628         unsigned long flags;
2629         int blocking;
2630         int new_level;
2631         int ret = 0;
2632
2633         mlog_entry_void();
2634
2635         spin_lock_irqsave(&lockres->l_lock, flags);
2636
2637         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
2638
2639 recheck:
2640         if (lockres->l_flags & OCFS2_LOCK_BUSY) {
2641                 ctl->requeue = 1;
2642                 ret = ocfs2_prepare_cancel_convert(osb, lockres);
2643                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2644                 if (ret) {
2645                         ret = ocfs2_cancel_convert(osb, lockres);
2646                         if (ret < 0)
2647                                 mlog_errno(ret);
2648                 }
2649                 goto leave;
2650         }
2651
2652         /* if we're blocking an exclusive and we have *any* holders,
2653          * then requeue. */
2654         if ((lockres->l_blocking == LKM_EXMODE)
2655             && (lockres->l_ex_holders || lockres->l_ro_holders))
2656                 goto leave_requeue;
2657
2658         /* If it's a PR we're blocking, then only
2659          * requeue if we've got any EX holders */
2660         if (lockres->l_blocking == LKM_PRMODE &&
2661             lockres->l_ex_holders)
2662                 goto leave_requeue;
2663
2664         /*
2665          * Can we get a lock in this state if the holder counts are
2666          * zero? The meta data unblock code used to check this.
2667          */
2668         if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
2669             && (lockres->l_flags & OCFS2_LOCK_REFRESHING))
2670                 goto leave_requeue;
2671
2672         new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
2673
2674         if (lockres->l_ops->check_downconvert
2675             && !lockres->l_ops->check_downconvert(lockres, new_level))
2676                 goto leave_requeue;
2677
2678         /* If we get here, then we know that there are no more
2679          * incompatible holders (and anyone asking for an incompatible
2680          * lock is blocked). We can now downconvert the lock */
2681         if (!worker)
2682                 goto downconvert;
2683
2684         /* Some lockres types want to do a bit of work before
2685          * downconverting a lock. Allow that here. The worker function
2686          * may sleep, so we save off a copy of what we're blocking as
2687          * it may change while we're not holding the spin lock. */
2688         blocking = lockres->l_blocking;
2689         spin_unlock_irqrestore(&lockres->l_lock, flags);
2690
2691         ctl->unblock_action = worker(lockres, blocking);
2692
2693         if (ctl->unblock_action == UNBLOCK_STOP_POST)
2694                 goto leave;
2695
2696         spin_lock_irqsave(&lockres->l_lock, flags);
2697         if (blocking != lockres->l_blocking) {
2698                 /* If this changed underneath us, then we can't drop
2699                  * it just yet. */
2700                 goto recheck;
2701         }
2702
2703 downconvert:
2704         ctl->requeue = 0;
2705
2706         ocfs2_prepare_downconvert(lockres, new_level);
2707         spin_unlock_irqrestore(&lockres->l_lock, flags);
2708         ret = ocfs2_downconvert_lock(osb, lockres, new_level, 0);
2709 leave:
2710         mlog_exit(ret);
2711         return ret;
2712
2713 leave_requeue:
2714         spin_unlock_irqrestore(&lockres->l_lock, flags);
2715         ctl->requeue = 1;
2716
2717         mlog_exit(0);
2718         return 0;
2719 }
2720
2721 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
2722                                      int blocking)
2723 {
2724         struct inode *inode;
2725         struct address_space *mapping;
2726
2727         inode = ocfs2_lock_res_inode(lockres);
2728         mapping = inode->i_mapping;
2729
2730         if (filemap_fdatawrite(mapping)) {
2731                 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
2732                      (unsigned long long)OCFS2_I(inode)->ip_blkno);
2733         }
2734         sync_mapping_buffers(mapping);
2735         if (blocking == LKM_EXMODE) {
2736                 truncate_inode_pages(mapping, 0);
2737                 unmap_mapping_range(mapping, 0, 0, 0);
2738         } else {
2739                 /* We only need to wait on the I/O if we're not also
2740                  * truncating pages because truncate_inode_pages waits
2741                  * for us above. We don't truncate pages if we're
2742                  * blocking anything < EXMODE because we want to keep
2743                  * them around in that case. */
2744                 filemap_fdatawait(mapping);
2745         }
2746
2747         return UNBLOCK_CONTINUE;
2748 }
2749
2750 int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
2751                        struct ocfs2_unblock_ctl *ctl)
2752 {
2753         int status;
2754         struct inode *inode;
2755         struct ocfs2_super *osb;
2756
2757         mlog_entry_void();
2758
2759         inode = ocfs2_lock_res_inode(lockres);
2760         osb = OCFS2_SB(inode->i_sb);
2761
2762         mlog(0, "unblock inode %llu\n",
2763              (unsigned long long)OCFS2_I(inode)->ip_blkno);
2764
2765         status = ocfs2_generic_unblock_lock(osb, lockres, ctl,
2766                                             ocfs2_data_convert_worker);
2767         if (status < 0)
2768                 mlog_errno(status);
2769
2770         mlog(0, "inode %llu, requeue = %d\n",
2771              (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue);
2772
2773         mlog_exit(status);
2774         return status;
2775 }
2776
2777 static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
2778                                     struct ocfs2_unblock_ctl *ctl)
2779 {
2780         int status;
2781         struct inode *inode;
2782
2783         mlog_entry_void();
2784
2785         mlog(0, "Unblock lockres %s\n", lockres->l_name);
2786
2787         inode  = ocfs2_lock_res_inode(lockres);
2788
2789         status = ocfs2_generic_unblock_lock(OCFS2_SB(inode->i_sb),
2790                                             lockres, ctl, NULL);
2791         if (status < 0)
2792                 mlog_errno(status);
2793
2794         mlog_exit(status);
2795         return status;
2796 }
2797
2798 static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
2799                               struct ocfs2_unblock_ctl *ctl)
2800 {
2801         int status;
2802         struct inode *inode;
2803
2804         mlog_entry_void();
2805
2806         inode = ocfs2_lock_res_inode(lockres);
2807
2808         mlog(0, "unblock inode %llu\n",
2809              (unsigned long long)OCFS2_I(inode)->ip_blkno);
2810
2811         status = ocfs2_do_unblock_meta(inode, &ctl->requeue);
2812         if (status < 0)
2813                 mlog_errno(status);
2814
2815         mlog(0, "inode %llu, requeue = %d\n",
2816              (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue);
2817
2818         mlog_exit(status);
2819         return status;
2820 }
2821
2822 /*
2823  * Does the final reference drop on our dentry lock. Right now this
2824  * happens in the vote thread, but we could choose to simplify the
2825  * dlmglue API and push these off to the ocfs2_wq in the future.
2826  */
2827 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
2828                                      struct ocfs2_lock_res *lockres)
2829 {
2830         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2831         ocfs2_dentry_lock_put(osb, dl);
2832 }
2833
2834 /*
2835  * d_delete() matching dentries before the lock downconvert.
2836  *
2837  * At this point, any process waiting to destroy the
2838  * dentry_lock due to last ref count is stopped by the
2839  * OCFS2_LOCK_QUEUED flag.
2840  *
2841  * We have two potential problems
2842  *
2843  * 1) If we do the last reference drop on our dentry_lock (via dput)
2844  *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
2845  *    the downconvert to finish. Instead we take an elevated
2846  *    reference and push the drop until after we've completed our
2847  *    unblock processing.
2848  *
2849  * 2) There might be another process with a final reference,
2850  *    waiting on us to finish processing. If this is the case, we
2851  *    detect it and exit out - there's no more dentries anyway.
2852  */
2853 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
2854                                        int blocking)
2855 {
2856         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2857         struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
2858         struct dentry *dentry;
2859         unsigned long flags;
2860         int extra_ref = 0;
2861
2862         /*
2863          * This node is blocking another node from getting a read
2864          * lock. This happens when we've renamed within a
2865          * directory. We've forced the other nodes to d_delete(), but
2866          * we never actually dropped our lock because it's still
2867          * valid. The downconvert code will retain a PR for this node,
2868          * so there's no further work to do.
2869          */
2870         if (blocking == LKM_PRMODE)
2871                 return UNBLOCK_CONTINUE;
2872
2873         /*
2874          * Mark this inode as potentially orphaned. The code in
2875          * ocfs2_delete_inode() will figure out whether it actually
2876          * needs to be freed or not.
2877          */
2878         spin_lock(&oi->ip_lock);
2879         oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
2880         spin_unlock(&oi->ip_lock);
2881
2882         /*
2883          * Yuck. We need to make sure however that the check of
2884          * OCFS2_LOCK_FREEING and the extra reference are atomic with
2885          * respect to a reference decrement or the setting of that
2886          * flag.
2887          */
2888         spin_lock_irqsave(&lockres->l_lock, flags);
2889         spin_lock(&dentry_attach_lock);
2890         if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
2891             && dl->dl_count) {
2892                 dl->dl_count++;
2893                 extra_ref = 1;
2894         }
2895         spin_unlock(&dentry_attach_lock);
2896         spin_unlock_irqrestore(&lockres->l_lock, flags);
2897
2898         mlog(0, "extra_ref = %d\n", extra_ref);
2899
2900         /*
2901          * We have a process waiting on us in ocfs2_dentry_iput(),
2902          * which means we can't have any more outstanding
2903          * aliases. There's no need to do any more work.
2904          */
2905         if (!extra_ref)
2906                 return UNBLOCK_CONTINUE;
2907
2908         spin_lock(&dentry_attach_lock);
2909         while (1) {
2910                 dentry = ocfs2_find_local_alias(dl->dl_inode,
2911                                                 dl->dl_parent_blkno, 1);
2912                 if (!dentry)
2913                         break;
2914                 spin_unlock(&dentry_attach_lock);
2915
2916                 mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
2917                      dentry->d_name.name);
2918
2919                 /*
2920                  * The following dcache calls may do an
2921                  * iput(). Normally we don't want that from the
2922                  * downconverting thread, but in this case it's ok
2923                  * because the requesting node already has an
2924                  * exclusive lock on the inode, so it can't be queued
2925                  * for a downconvert.
2926                  */
2927                 d_delete(dentry);
2928                 dput(dentry);
2929
2930                 spin_lock(&dentry_attach_lock);
2931         }
2932         spin_unlock(&dentry_attach_lock);
2933
2934         /*
2935          * If we are the last holder of this dentry lock, there is no
2936          * reason to downconvert so skip straight to the unlock.
2937          */
2938         if (dl->dl_count == 1)
2939                 return UNBLOCK_STOP_POST;
2940
2941         return UNBLOCK_CONTINUE_POST;
2942 }
2943
2944 static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres,
2945                                      struct ocfs2_unblock_ctl *ctl)
2946 {
2947         int ret;
2948         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2949         struct ocfs2_super *osb = OCFS2_SB(dl->dl_inode->i_sb);
2950
2951         mlog(0, "unblock dentry lock: %llu\n",
2952              (unsigned long long)OCFS2_I(dl->dl_inode)->ip_blkno);
2953
2954         ret = ocfs2_generic_unblock_lock(osb,
2955                                          lockres,
2956                                          ctl,
2957                                          ocfs2_dentry_convert_worker);
2958         if (ret < 0)
2959                 mlog_errno(ret);
2960
2961         mlog(0, "requeue = %d, post = %d\n", ctl->requeue, ctl->unblock_action);
2962
2963         return ret;
2964 }
2965
2966 /* Generic unblock function for any lockres whose private data is an
2967  * ocfs2_super pointer. */
2968 static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
2969                                   struct ocfs2_unblock_ctl *ctl)
2970 {
2971         int status;
2972         struct ocfs2_super *osb;
2973
2974         mlog_entry_void();
2975
2976         mlog(0, "Unblock lockres %s\n", lockres->l_name);
2977
2978         osb = ocfs2_get_lockres_osb(lockres);
2979
2980         status = ocfs2_generic_unblock_lock(osb,
2981                                             lockres,
2982                                             ctl,
2983                                             NULL);
2984         if (status < 0)
2985                 mlog_errno(status);
2986
2987         mlog_exit(status);
2988         return status;
2989 }
2990
2991 void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
2992                                 struct ocfs2_lock_res *lockres)
2993 {
2994         int status;
2995         struct ocfs2_unblock_ctl ctl = {0, 0,};
2996         unsigned long flags;
2997
2998         /* Our reference to the lockres in this function can be
2999          * considered valid until we remove the OCFS2_LOCK_QUEUED
3000          * flag. */
3001
3002         mlog_entry_void();
3003
3004         BUG_ON(!lockres);
3005         BUG_ON(!lockres->l_ops);
3006         BUG_ON(!lockres->l_ops->unblock);
3007
3008         mlog(0, "lockres %s blocked.\n", lockres->l_name);
3009
3010         /* Detect whether a lock has been marked as going away while
3011          * the vote thread was processing other things. A lock can
3012          * still be marked with OCFS2_LOCK_FREEING after this check,
3013          * but short circuiting here will still save us some
3014          * performance. */
3015         spin_lock_irqsave(&lockres->l_lock, flags);
3016         if (lockres->l_flags & OCFS2_LOCK_FREEING)
3017                 goto unqueue;
3018         spin_unlock_irqrestore(&lockres->l_lock, flags);
3019
3020         status = lockres->l_ops->unblock(lockres, &ctl);
3021         if (status < 0)
3022                 mlog_errno(status);
3023
3024         spin_lock_irqsave(&lockres->l_lock, flags);
3025 unqueue:
3026         if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
3027                 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
3028         } else
3029                 ocfs2_schedule_blocked_lock(osb, lockres);
3030
3031         mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
3032              ctl.requeue ? "yes" : "no");
3033         spin_unlock_irqrestore(&lockres->l_lock, flags);
3034
3035         if (ctl.unblock_action != UNBLOCK_CONTINUE
3036             && lockres->l_ops->post_unlock)
3037                 lockres->l_ops->post_unlock(osb, lockres);
3038
3039         mlog_exit_void();
3040 }
3041
3042 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
3043                                         struct ocfs2_lock_res *lockres)
3044 {
3045         mlog_entry_void();
3046
3047         assert_spin_locked(&lockres->l_lock);
3048
3049         if (lockres->l_flags & OCFS2_LOCK_FREEING) {
3050                 /* Do not schedule a lock for downconvert when it's on
3051                  * the way to destruction - any nodes wanting access
3052                  * to the resource will get it soon. */
3053                 mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n",
3054                      lockres->l_name, lockres->l_flags);
3055                 return;
3056         }
3057
3058         lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
3059
3060         spin_lock(&osb->vote_task_lock);
3061         if (list_empty(&lockres->l_blocked_list)) {
3062                 list_add_tail(&lockres->l_blocked_list,
3063                               &osb->blocked_lock_list);
3064                 osb->blocked_lock_count++;
3065         }
3066         spin_unlock(&osb->vote_task_lock);
3067
3068         mlog_exit_void();
3069 }
3070
3071 /* This aids in debugging situations where a bad LVB might be involved. */
3072 void ocfs2_dump_meta_lvb_info(u64 level,
3073                               const char *function,
3074                               unsigned int line,
3075                               struct ocfs2_lock_res *lockres)
3076 {
3077         struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
3078
3079         mlog(level, "LVB information for %s (called from %s:%u):\n",
3080              lockres->l_name, function, line);
3081         mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
3082              lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
3083              be32_to_cpu(lvb->lvb_igeneration));
3084         mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
3085              (unsigned long long)be64_to_cpu(lvb->lvb_isize),
3086              be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
3087              be16_to_cpu(lvb->lvb_imode));
3088         mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
3089              "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
3090              (long long)be64_to_cpu(lvb->lvb_iatime_packed),
3091              (long long)be64_to_cpu(lvb->lvb_ictime_packed),
3092              (long long)be64_to_cpu(lvb->lvb_imtime_packed),
3093              be32_to_cpu(lvb->lvb_iattr));
3094 }