1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
6 ** This copyrighted material is made available to anyone wishing to use,
7 ** modify, copy, or redistribute it subject to the terms and conditions
8 ** of the GNU General Public License v.2.
10 *******************************************************************************
11 ******************************************************************************/
13 /* Central locking logic has four stages:
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
54 L: send_xxxx() -> R: receive_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
63 #include "requestqueue.h"
67 #include "lockspace.h"
72 #include "lvb_table.h"
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
87 struct dlm_message *ms);
88 static int receive_extralen(struct dlm_message *ms);
89 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
90 static void del_timeout(struct dlm_lkb *lkb);
91 void dlm_timeout_warn(struct dlm_lkb *lkb);
94 * Lock compatibilty matrix - thanks Steve
95 * UN = Unlocked state. Not really a state, used as a flag
96 * PD = Padding. Used to make the matrix a nice power of two in size
97 * Other states are the same as the VMS DLM.
98 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
101 static const int __dlm_compat_matrix[8][8] = {
102 /* UN NL CR CW PR PW EX PD */
103 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
104 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
105 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
106 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
107 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
108 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
109 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
110 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
114 * This defines the direction of transfer of LVB data.
115 * Granted mode is the row; requested mode is the column.
116 * Usage: matrix[grmode+1][rqmode+1]
117 * 1 = LVB is returned to the caller
118 * 0 = LVB is written to the resource
119 * -1 = nothing happens to the LVB
122 const int dlm_lvb_operations[8][8] = {
123 /* UN NL CR CW PR PW EX PD*/
124 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
125 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
126 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
127 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
128 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
129 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
130 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
131 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
134 #define modes_compat(gr, rq) \
135 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137 int dlm_modes_compat(int mode1, int mode2)
139 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
143 * Compatibility matrix for conversions with QUECVT set.
144 * Granted mode is the row; requested mode is the column.
145 * Usage: matrix[grmode+1][rqmode+1]
148 static const int __quecvt_compat_matrix[8][8] = {
149 /* UN NL CR CW PR PW EX PD */
150 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
151 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
152 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
153 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
154 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
155 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
156 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
157 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
160 void dlm_print_lkb(struct dlm_lkb *lkb)
162 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
163 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
164 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
165 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
166 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
169 void dlm_print_rsb(struct dlm_rsb *r)
171 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
172 r->res_nodeid, r->res_flags, r->res_first_lkid,
173 r->res_recover_locks_count, r->res_name);
176 void dlm_dump_rsb(struct dlm_rsb *r)
182 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
183 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
184 printk(KERN_ERR "rsb lookup list\n");
185 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
187 printk(KERN_ERR "rsb grant queue:\n");
188 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
190 printk(KERN_ERR "rsb convert queue:\n");
191 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
193 printk(KERN_ERR "rsb wait queue:\n");
194 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
198 /* Threads cannot use the lockspace while it's being recovered */
200 static inline void dlm_lock_recovery(struct dlm_ls *ls)
202 down_read(&ls->ls_in_recovery);
205 void dlm_unlock_recovery(struct dlm_ls *ls)
207 up_read(&ls->ls_in_recovery);
210 int dlm_lock_recovery_try(struct dlm_ls *ls)
212 return down_read_trylock(&ls->ls_in_recovery);
215 static inline int can_be_queued(struct dlm_lkb *lkb)
217 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
220 static inline int force_blocking_asts(struct dlm_lkb *lkb)
222 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
225 static inline int is_demoted(struct dlm_lkb *lkb)
227 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
230 static inline int is_altmode(struct dlm_lkb *lkb)
232 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
235 static inline int is_granted(struct dlm_lkb *lkb)
237 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
240 static inline int is_remote(struct dlm_rsb *r)
242 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
243 return !!r->res_nodeid;
246 static inline int is_process_copy(struct dlm_lkb *lkb)
248 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
251 static inline int is_master_copy(struct dlm_lkb *lkb)
253 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
254 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
255 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
258 static inline int middle_conversion(struct dlm_lkb *lkb)
260 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
261 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
266 static inline int down_conversion(struct dlm_lkb *lkb)
268 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
271 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
273 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
276 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
278 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
281 static inline int is_overlap(struct dlm_lkb *lkb)
283 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
284 DLM_IFL_OVERLAP_CANCEL));
287 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
289 if (is_master_copy(lkb))
294 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296 /* if the operation was a cancel, then return -DLM_ECANCEL, if a
297 timeout caused the cancel then return -ETIMEDOUT */
298 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
299 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
303 lkb->lkb_lksb->sb_status = rv;
304 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
306 dlm_add_ast(lkb, AST_COMP);
309 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
312 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
315 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
317 if (is_master_copy(lkb))
318 send_bast(r, lkb, rqmode);
320 lkb->lkb_bastmode = rqmode;
321 dlm_add_ast(lkb, AST_BAST);
326 * Basic operations on rsb's and lkb's
329 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
333 r = allocate_rsb(ls, len);
339 memcpy(r->res_name, name, len);
340 mutex_init(&r->res_mutex);
342 INIT_LIST_HEAD(&r->res_lookup);
343 INIT_LIST_HEAD(&r->res_grantqueue);
344 INIT_LIST_HEAD(&r->res_convertqueue);
345 INIT_LIST_HEAD(&r->res_waitqueue);
346 INIT_LIST_HEAD(&r->res_root_list);
347 INIT_LIST_HEAD(&r->res_recover_list);
352 static int search_rsb_list(struct list_head *head, char *name, int len,
353 unsigned int flags, struct dlm_rsb **r_ret)
358 list_for_each_entry(r, head, res_hashchain) {
359 if (len == r->res_length && !memcmp(name, r->res_name, len))
365 if (r->res_nodeid && (flags & R_MASTER))
371 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
372 unsigned int flags, struct dlm_rsb **r_ret)
377 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
379 kref_get(&r->res_ref);
382 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
386 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
388 if (dlm_no_directory(ls))
391 if (r->res_nodeid == -1) {
392 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
393 r->res_first_lkid = 0;
394 } else if (r->res_nodeid > 0) {
395 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
396 r->res_first_lkid = 0;
398 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
399 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
406 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
407 unsigned int flags, struct dlm_rsb **r_ret)
410 write_lock(&ls->ls_rsbtbl[b].lock);
411 error = _search_rsb(ls, name, len, b, flags, r_ret);
412 write_unlock(&ls->ls_rsbtbl[b].lock);
417 * Find rsb in rsbtbl and potentially create/add one
419 * Delaying the release of rsb's has a similar benefit to applications keeping
420 * NL locks on an rsb, but without the guarantee that the cached master value
421 * will still be valid when the rsb is reused. Apps aren't always smart enough
422 * to keep NL locks on an rsb that they may lock again shortly; this can lead
423 * to excessive master lookups and removals if we don't delay the release.
425 * Searching for an rsb means looking through both the normal list and toss
426 * list. When found on the toss list the rsb is moved to the normal list with
427 * ref count of 1; when found on normal list the ref count is incremented.
430 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
431 unsigned int flags, struct dlm_rsb **r_ret)
433 struct dlm_rsb *r, *tmp;
434 uint32_t hash, bucket;
437 if (dlm_no_directory(ls))
440 hash = jhash(name, namelen, 0);
441 bucket = hash & (ls->ls_rsbtbl_size - 1);
443 error = search_rsb(ls, name, namelen, bucket, flags, &r);
447 if (error == -EBADR && !(flags & R_CREATE))
450 /* the rsb was found but wasn't a master copy */
451 if (error == -ENOTBLK)
455 r = create_rsb(ls, name, namelen);
460 r->res_bucket = bucket;
462 kref_init(&r->res_ref);
464 /* With no directory, the master can be set immediately */
465 if (dlm_no_directory(ls)) {
466 int nodeid = dlm_dir_nodeid(r);
467 if (nodeid == dlm_our_nodeid())
469 r->res_nodeid = nodeid;
472 write_lock(&ls->ls_rsbtbl[bucket].lock);
473 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
475 write_unlock(&ls->ls_rsbtbl[bucket].lock);
480 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
481 write_unlock(&ls->ls_rsbtbl[bucket].lock);
488 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
489 unsigned int flags, struct dlm_rsb **r_ret)
491 return find_rsb(ls, name, namelen, flags, r_ret);
494 /* This is only called to add a reference when the code already holds
495 a valid reference to the rsb, so there's no need for locking. */
497 static inline void hold_rsb(struct dlm_rsb *r)
499 kref_get(&r->res_ref);
502 void dlm_hold_rsb(struct dlm_rsb *r)
507 static void toss_rsb(struct kref *kref)
509 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
510 struct dlm_ls *ls = r->res_ls;
512 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
513 kref_init(&r->res_ref);
514 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
515 r->res_toss_time = jiffies;
517 free_lvb(r->res_lvbptr);
518 r->res_lvbptr = NULL;
522 /* When all references to the rsb are gone it's transfered to
523 the tossed list for later disposal. */
525 static void put_rsb(struct dlm_rsb *r)
527 struct dlm_ls *ls = r->res_ls;
528 uint32_t bucket = r->res_bucket;
530 write_lock(&ls->ls_rsbtbl[bucket].lock);
531 kref_put(&r->res_ref, toss_rsb);
532 write_unlock(&ls->ls_rsbtbl[bucket].lock);
535 void dlm_put_rsb(struct dlm_rsb *r)
540 /* See comment for unhold_lkb */
542 static void unhold_rsb(struct dlm_rsb *r)
545 rv = kref_put(&r->res_ref, toss_rsb);
546 DLM_ASSERT(!rv, dlm_dump_rsb(r););
549 static void kill_rsb(struct kref *kref)
551 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
553 /* All work is done after the return from kref_put() so we
554 can release the write_lock before the remove and free. */
556 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
557 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
558 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
559 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
560 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
561 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
564 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
565 The rsb must exist as long as any lkb's for it do. */
567 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
570 lkb->lkb_resource = r;
573 static void detach_lkb(struct dlm_lkb *lkb)
575 if (lkb->lkb_resource) {
576 put_rsb(lkb->lkb_resource);
577 lkb->lkb_resource = NULL;
581 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
583 struct dlm_lkb *lkb, *tmp;
587 lkb = allocate_lkb(ls);
591 lkb->lkb_nodeid = -1;
592 lkb->lkb_grmode = DLM_LOCK_IV;
593 kref_init(&lkb->lkb_ref);
594 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
595 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
596 INIT_LIST_HEAD(&lkb->lkb_time_list);
598 get_random_bytes(&bucket, sizeof(bucket));
599 bucket &= (ls->ls_lkbtbl_size - 1);
601 write_lock(&ls->ls_lkbtbl[bucket].lock);
603 /* counter can roll over so we must verify lkid is not in use */
606 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
608 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
610 if (tmp->lkb_id != lkid)
618 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
619 write_unlock(&ls->ls_lkbtbl[bucket].lock);
625 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
628 uint16_t bucket = (lkid >> 16);
630 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
631 if (lkb->lkb_id == lkid)
637 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
640 uint16_t bucket = (lkid >> 16);
642 if (bucket >= ls->ls_lkbtbl_size)
645 read_lock(&ls->ls_lkbtbl[bucket].lock);
646 lkb = __find_lkb(ls, lkid);
648 kref_get(&lkb->lkb_ref);
649 read_unlock(&ls->ls_lkbtbl[bucket].lock);
652 return lkb ? 0 : -ENOENT;
655 static void kill_lkb(struct kref *kref)
657 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
659 /* All work is done after the return from kref_put() so we
660 can release the write_lock before the detach_lkb */
662 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
665 /* __put_lkb() is used when an lkb may not have an rsb attached to
666 it so we need to provide the lockspace explicitly */
668 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
670 uint16_t bucket = (lkb->lkb_id >> 16);
672 write_lock(&ls->ls_lkbtbl[bucket].lock);
673 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
674 list_del(&lkb->lkb_idtbl_list);
675 write_unlock(&ls->ls_lkbtbl[bucket].lock);
679 /* for local/process lkbs, lvbptr points to caller's lksb */
680 if (lkb->lkb_lvbptr && is_master_copy(lkb))
681 free_lvb(lkb->lkb_lvbptr);
685 write_unlock(&ls->ls_lkbtbl[bucket].lock);
690 int dlm_put_lkb(struct dlm_lkb *lkb)
694 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
695 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
697 ls = lkb->lkb_resource->res_ls;
698 return __put_lkb(ls, lkb);
701 /* This is only called to add a reference when the code already holds
702 a valid reference to the lkb, so there's no need for locking. */
704 static inline void hold_lkb(struct dlm_lkb *lkb)
706 kref_get(&lkb->lkb_ref);
709 /* This is called when we need to remove a reference and are certain
710 it's not the last ref. e.g. del_lkb is always called between a
711 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
712 put_lkb would work fine, but would involve unnecessary locking */
714 static inline void unhold_lkb(struct dlm_lkb *lkb)
717 rv = kref_put(&lkb->lkb_ref, kill_lkb);
718 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
721 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
724 struct dlm_lkb *lkb = NULL;
726 list_for_each_entry(lkb, head, lkb_statequeue)
727 if (lkb->lkb_rqmode < mode)
731 list_add_tail(new, head);
733 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
736 /* add/remove lkb to rsb's grant/convert/wait queue */
738 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
740 kref_get(&lkb->lkb_ref);
742 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
744 lkb->lkb_status = status;
747 case DLM_LKSTS_WAITING:
748 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
749 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
751 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
753 case DLM_LKSTS_GRANTED:
754 /* convention says granted locks kept in order of grmode */
755 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
758 case DLM_LKSTS_CONVERT:
759 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
760 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
762 list_add_tail(&lkb->lkb_statequeue,
763 &r->res_convertqueue);
766 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
770 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
773 list_del(&lkb->lkb_statequeue);
777 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
781 add_lkb(r, lkb, sts);
785 static int msg_reply_type(int mstype)
788 case DLM_MSG_REQUEST:
789 return DLM_MSG_REQUEST_REPLY;
790 case DLM_MSG_CONVERT:
791 return DLM_MSG_CONVERT_REPLY;
793 return DLM_MSG_UNLOCK_REPLY;
795 return DLM_MSG_CANCEL_REPLY;
797 return DLM_MSG_LOOKUP_REPLY;
802 /* add/remove lkb from global waiters list of lkb's waiting for
803 a reply from a remote node */
805 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
807 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
810 mutex_lock(&ls->ls_waiters_mutex);
812 if (is_overlap_unlock(lkb) ||
813 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
818 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
821 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
824 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
830 lkb->lkb_wait_count++;
833 log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
834 lkb->lkb_id, lkb->lkb_wait_type, mstype,
835 lkb->lkb_wait_count, lkb->lkb_flags);
839 DLM_ASSERT(!lkb->lkb_wait_count,
841 printk("wait_count %d\n", lkb->lkb_wait_count););
843 lkb->lkb_wait_count++;
844 lkb->lkb_wait_type = mstype;
846 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
849 log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
850 lkb->lkb_id, error, lkb->lkb_flags, mstype,
851 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
852 mutex_unlock(&ls->ls_waiters_mutex);
856 /* We clear the RESEND flag because we might be taking an lkb off the waiters
857 list as part of process_requestqueue (e.g. a lookup that has an optimized
858 request reply on the requestqueue) between dlm_recover_waiters_pre() which
859 set RESEND and dlm_recover_waiters_post() */
861 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
863 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
864 int overlap_done = 0;
866 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
867 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
872 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
873 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
878 /* N.B. type of reply may not always correspond to type of original
879 msg due to lookup->request optimization, verify others? */
881 if (lkb->lkb_wait_type) {
882 lkb->lkb_wait_type = 0;
886 log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
887 lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
891 /* the force-unlock/cancel has completed and we haven't recvd a reply
892 to the op that was in progress prior to the unlock/cancel; we
893 give up on any reply to the earlier op. FIXME: not sure when/how
896 if (overlap_done && lkb->lkb_wait_type) {
897 log_error(ls, "remove_from_waiters %x reply %d give up on %d",
898 lkb->lkb_id, mstype, lkb->lkb_wait_type);
899 lkb->lkb_wait_count--;
900 lkb->lkb_wait_type = 0;
903 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
905 lkb->lkb_flags &= ~DLM_IFL_RESEND;
906 lkb->lkb_wait_count--;
907 if (!lkb->lkb_wait_count)
908 list_del_init(&lkb->lkb_wait_reply);
913 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
915 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
918 mutex_lock(&ls->ls_waiters_mutex);
919 error = _remove_from_waiters(lkb, mstype);
920 mutex_unlock(&ls->ls_waiters_mutex);
924 /* Handles situations where we might be processing a "fake" or "stub" reply in
925 which we can't try to take waiters_mutex again. */
927 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
929 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
932 if (ms != &ls->ls_stub_ms)
933 mutex_lock(&ls->ls_waiters_mutex);
934 error = _remove_from_waiters(lkb, ms->m_type);
935 if (ms != &ls->ls_stub_ms)
936 mutex_unlock(&ls->ls_waiters_mutex);
940 static void dir_remove(struct dlm_rsb *r)
944 if (dlm_no_directory(r->res_ls))
947 to_nodeid = dlm_dir_nodeid(r);
948 if (to_nodeid != dlm_our_nodeid())
951 dlm_dir_remove_entry(r->res_ls, to_nodeid,
952 r->res_name, r->res_length);
955 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
956 found since they are in order of newest to oldest? */
958 static int shrink_bucket(struct dlm_ls *ls, int b)
961 int count = 0, found;
965 write_lock(&ls->ls_rsbtbl[b].lock);
966 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
968 if (!time_after_eq(jiffies, r->res_toss_time +
969 dlm_config.ci_toss_secs * HZ))
976 write_unlock(&ls->ls_rsbtbl[b].lock);
980 if (kref_put(&r->res_ref, kill_rsb)) {
981 list_del(&r->res_hashchain);
982 write_unlock(&ls->ls_rsbtbl[b].lock);
989 write_unlock(&ls->ls_rsbtbl[b].lock);
990 log_error(ls, "tossed rsb in use %s", r->res_name);
997 void dlm_scan_rsbs(struct dlm_ls *ls)
1001 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1002 shrink_bucket(ls, i);
1003 if (dlm_locking_stopped(ls))
1009 static void add_timeout(struct dlm_lkb *lkb)
1011 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1013 if (is_master_copy(lkb))
1016 if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1019 if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1020 !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1021 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1027 DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1028 mutex_lock(&ls->ls_timeout_mutex);
1030 lkb->lkb_timestamp = jiffies;
1031 list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1032 mutex_unlock(&ls->ls_timeout_mutex);
1035 static void del_timeout(struct dlm_lkb *lkb)
1037 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1039 mutex_lock(&ls->ls_timeout_mutex);
1040 if (!list_empty(&lkb->lkb_time_list)) {
1041 list_del_init(&lkb->lkb_time_list);
1044 mutex_unlock(&ls->ls_timeout_mutex);
1047 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1048 lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex
1049 and then lock rsb because of lock ordering in add_timeout. We may need
1050 to specify some special timeout-related bits in the lkb that are just to
1051 be accessed under the timeout_mutex. */
1053 void dlm_scan_timeout(struct dlm_ls *ls)
1056 struct dlm_lkb *lkb;
1057 int do_cancel, do_warn;
1060 if (dlm_locking_stopped(ls))
1065 mutex_lock(&ls->ls_timeout_mutex);
1066 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1068 if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1069 time_after_eq(jiffies, lkb->lkb_timestamp +
1070 lkb->lkb_timeout_cs * HZ/100))
1073 if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1074 time_after_eq(jiffies, lkb->lkb_timestamp +
1075 dlm_config.ci_timewarn_cs * HZ/100))
1078 if (!do_cancel && !do_warn)
1083 mutex_unlock(&ls->ls_timeout_mutex);
1085 if (!do_cancel && !do_warn)
1088 r = lkb->lkb_resource;
1093 /* clear flag so we only warn once */
1094 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1095 if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1097 dlm_timeout_warn(lkb);
1101 log_debug("timeout cancel %x node %d %s", lkb->lkb_id,
1102 lkb->lkb_nodeid, r->res_name);
1103 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1104 lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1106 _cancel_lock(r, lkb);
1115 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1116 dlm_recoverd before checking/setting ls_recover_begin. */
1118 void dlm_adjust_timeouts(struct dlm_ls *ls)
1120 struct dlm_lkb *lkb;
1121 long adj = jiffies - ls->ls_recover_begin;
1123 ls->ls_recover_begin = 0;
1124 mutex_lock(&ls->ls_timeout_mutex);
1125 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1126 lkb->lkb_timestamp += adj;
1127 mutex_unlock(&ls->ls_timeout_mutex);
1130 /* lkb is master or local copy */
1132 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1134 int b, len = r->res_ls->ls_lvblen;
1136 /* b=1 lvb returned to caller
1137 b=0 lvb written to rsb or invalidated
1140 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1143 if (!lkb->lkb_lvbptr)
1146 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1152 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1153 lkb->lkb_lvbseq = r->res_lvbseq;
1155 } else if (b == 0) {
1156 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1157 rsb_set_flag(r, RSB_VALNOTVALID);
1161 if (!lkb->lkb_lvbptr)
1164 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1168 r->res_lvbptr = allocate_lvb(r->res_ls);
1173 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1175 lkb->lkb_lvbseq = r->res_lvbseq;
1176 rsb_clear_flag(r, RSB_VALNOTVALID);
1179 if (rsb_flag(r, RSB_VALNOTVALID))
1180 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1183 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1185 if (lkb->lkb_grmode < DLM_LOCK_PW)
1188 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1189 rsb_set_flag(r, RSB_VALNOTVALID);
1193 if (!lkb->lkb_lvbptr)
1196 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1200 r->res_lvbptr = allocate_lvb(r->res_ls);
1205 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1207 rsb_clear_flag(r, RSB_VALNOTVALID);
1210 /* lkb is process copy (pc) */
1212 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1213 struct dlm_message *ms)
1217 if (!lkb->lkb_lvbptr)
1220 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1223 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1225 int len = receive_extralen(ms);
1226 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1227 lkb->lkb_lvbseq = ms->m_lvbseq;
1231 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1232 remove_lock -- used for unlock, removes lkb from granted
1233 revert_lock -- used for cancel, moves lkb from convert to granted
1234 grant_lock -- used for request and convert, adds lkb to granted or
1235 moves lkb from convert or waiting to granted
1237 Each of these is used for master or local copy lkb's. There is
1238 also a _pc() variation used to make the corresponding change on
1239 a process copy (pc) lkb. */
1241 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1244 lkb->lkb_grmode = DLM_LOCK_IV;
1245 /* this unhold undoes the original ref from create_lkb()
1246 so this leads to the lkb being freed */
1250 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1252 set_lvb_unlock(r, lkb);
1253 _remove_lock(r, lkb);
1256 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1258 _remove_lock(r, lkb);
1261 /* returns: 0 did nothing
1262 1 moved lock to granted
1265 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1269 lkb->lkb_rqmode = DLM_LOCK_IV;
1271 switch (lkb->lkb_status) {
1272 case DLM_LKSTS_GRANTED:
1274 case DLM_LKSTS_CONVERT:
1275 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1278 case DLM_LKSTS_WAITING:
1280 lkb->lkb_grmode = DLM_LOCK_IV;
1281 /* this unhold undoes the original ref from create_lkb()
1282 so this leads to the lkb being freed */
1287 log_print("invalid status for revert %d", lkb->lkb_status);
1292 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1294 return revert_lock(r, lkb);
1297 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1299 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1300 lkb->lkb_grmode = lkb->lkb_rqmode;
1301 if (lkb->lkb_status)
1302 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1304 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1307 lkb->lkb_rqmode = DLM_LOCK_IV;
1310 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1312 set_lvb_lock(r, lkb);
1313 _grant_lock(r, lkb);
1314 lkb->lkb_highbast = 0;
1317 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1318 struct dlm_message *ms)
1320 set_lvb_lock_pc(r, lkb, ms);
1321 _grant_lock(r, lkb);
1324 /* called by grant_pending_locks() which means an async grant message must
1325 be sent to the requesting node in addition to granting the lock if the
1326 lkb belongs to a remote node. */
1328 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1331 if (is_master_copy(lkb))
1334 queue_cast(r, lkb, 0);
1337 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1338 change the granted/requested modes. We're munging things accordingly in
1340 CONVDEADLK: our grmode may have been forced down to NL to resolve a
1342 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1343 compatible with other granted locks */
1345 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1347 if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1348 log_print("munge_demoted %x invalid reply type %d",
1349 lkb->lkb_id, ms->m_type);
1353 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1354 log_print("munge_demoted %x invalid modes gr %d rq %d",
1355 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1359 lkb->lkb_grmode = DLM_LOCK_NL;
1362 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1364 if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1365 ms->m_type != DLM_MSG_GRANT) {
1366 log_print("munge_altmode %x invalid reply type %d",
1367 lkb->lkb_id, ms->m_type);
1371 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1372 lkb->lkb_rqmode = DLM_LOCK_PR;
1373 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1374 lkb->lkb_rqmode = DLM_LOCK_CW;
1376 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1381 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1383 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1385 if (lkb->lkb_id == first->lkb_id)
1391 /* Check if the given lkb conflicts with another lkb on the queue. */
1393 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1395 struct dlm_lkb *this;
1397 list_for_each_entry(this, head, lkb_statequeue) {
1400 if (!modes_compat(this, lkb))
1407 * "A conversion deadlock arises with a pair of lock requests in the converting
1408 * queue for one resource. The granted mode of each lock blocks the requested
1409 * mode of the other lock."
1411 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1412 * convert queue from being granted, then deadlk/demote lkb.
1415 * Granted Queue: empty
1416 * Convert Queue: NL->EX (first lock)
1417 * PR->EX (second lock)
1419 * The first lock can't be granted because of the granted mode of the second
1420 * lock and the second lock can't be granted because it's not first in the
1421 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1422 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1423 * flag set and return DEMOTED in the lksb flags.
1425 * Originally, this function detected conv-deadlk in a more limited scope:
1426 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1427 * - if lkb1 was the first entry in the queue (not just earlier), and was
1428 * blocked by the granted mode of lkb2, and there was nothing on the
1429 * granted queue preventing lkb1 from being granted immediately, i.e.
1430 * lkb2 was the only thing preventing lkb1 from being granted.
1432 * That second condition meant we'd only say there was conv-deadlk if
1433 * resolving it (by demotion) would lead to the first lock on the convert
1434 * queue being granted right away. It allowed conversion deadlocks to exist
1435 * between locks on the convert queue while they couldn't be granted anyway.
1437 * Now, we detect and take action on conversion deadlocks immediately when
1438 * they're created, even if they may not be immediately consequential. If
1439 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1440 * mode that would prevent lkb1's conversion from being granted, we do a
1441 * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1442 * I think this means that the lkb_is_ahead condition below should always
1443 * be zero, i.e. there will never be conv-deadlk between two locks that are
1444 * both already on the convert queue.
1447 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1449 struct dlm_lkb *lkb1;
1450 int lkb_is_ahead = 0;
1452 list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1458 if (!lkb_is_ahead) {
1459 if (!modes_compat(lkb2, lkb1))
1462 if (!modes_compat(lkb2, lkb1) &&
1463 !modes_compat(lkb1, lkb2))
1471 * Return 1 if the lock can be granted, 0 otherwise.
1472 * Also detect and resolve conversion deadlocks.
1474 * lkb is the lock to be granted
1476 * now is 1 if the function is being called in the context of the
1477 * immediate request, it is 0 if called later, after the lock has been
1480 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1483 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1485 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1488 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1489 * a new request for a NL mode lock being blocked.
1491 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1492 * request, then it would be granted. In essence, the use of this flag
1493 * tells the Lock Manager to expedite theis request by not considering
1494 * what may be in the CONVERTING or WAITING queues... As of this
1495 * writing, the EXPEDITE flag can be used only with new requests for NL
1496 * mode locks. This flag is not valid for conversion requests.
1498 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1499 * conversion or used with a non-NL requested mode. We also know an
1500 * EXPEDITE request is always granted immediately, so now must always
1501 * be 1. The full condition to grant an expedite request: (now &&
1502 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1503 * therefore be shortened to just checking the flag.
1506 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1510 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1511 * added to the remaining conditions.
1514 if (queue_conflict(&r->res_grantqueue, lkb))
1518 * 6-3: By default, a conversion request is immediately granted if the
1519 * requested mode is compatible with the modes of all other granted
1523 if (queue_conflict(&r->res_convertqueue, lkb))
1527 * 6-5: But the default algorithm for deciding whether to grant or
1528 * queue conversion requests does not by itself guarantee that such
1529 * requests are serviced on a "first come first serve" basis. This, in
1530 * turn, can lead to a phenomenon known as "indefinate postponement".
1532 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1533 * the system service employed to request a lock conversion. This flag
1534 * forces certain conversion requests to be queued, even if they are
1535 * compatible with the granted modes of other locks on the same
1536 * resource. Thus, the use of this flag results in conversion requests
1537 * being ordered on a "first come first servce" basis.
1539 * DCT: This condition is all about new conversions being able to occur
1540 * "in place" while the lock remains on the granted queue (assuming
1541 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1542 * doesn't _have_ to go onto the convert queue where it's processed in
1543 * order. The "now" variable is necessary to distinguish converts
1544 * being received and processed for the first time now, because once a
1545 * convert is moved to the conversion queue the condition below applies
1546 * requiring fifo granting.
1549 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1553 * The NOORDER flag is set to avoid the standard vms rules on grant
1557 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1561 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1562 * granted until all other conversion requests ahead of it are granted
1566 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1570 * 6-4: By default, a new request is immediately granted only if all
1571 * three of the following conditions are satisfied when the request is
1573 * - The queue of ungranted conversion requests for the resource is
1575 * - The queue of ungranted new requests for the resource is empty.
1576 * - The mode of the new request is compatible with the most
1577 * restrictive mode of all granted locks on the resource.
1580 if (now && !conv && list_empty(&r->res_convertqueue) &&
1581 list_empty(&r->res_waitqueue))
1585 * 6-4: Once a lock request is in the queue of ungranted new requests,
1586 * it cannot be granted until the queue of ungranted conversion
1587 * requests is empty, all ungranted new requests ahead of it are
1588 * granted and/or canceled, and it is compatible with the granted mode
1589 * of the most restrictive lock granted on the resource.
1592 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1593 first_in_list(lkb, &r->res_waitqueue))
1599 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1603 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1604 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1609 rv = _can_be_granted(r, lkb, now);
1614 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1615 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1616 * cancels one of the locks.
1619 if (is_convert && can_be_queued(lkb) &&
1620 conversion_deadlock_detect(r, lkb)) {
1621 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1622 lkb->lkb_grmode = DLM_LOCK_NL;
1623 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1624 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1628 log_print("can_be_granted deadlock %x now %d",
1637 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1638 * to grant a request in a mode other than the normal rqmode. It's a
1639 * simple way to provide a big optimization to applications that can
1643 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1645 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1649 lkb->lkb_rqmode = alt;
1650 rv = _can_be_granted(r, lkb, now);
1652 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1654 lkb->lkb_rqmode = rqmode;
1660 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1661 for locks pending on the convert list. Once verified (watch for these
1662 log_prints), we should be able to just call _can_be_granted() and not
1663 bother with the demote/deadlk cases here (and there's no easy way to deal
1664 with a deadlk here, we'd have to generate something like grant_lock with
1665 the deadlk error.) */
1667 /* returns the highest requested mode of all blocked conversions */
1669 static int grant_pending_convert(struct dlm_rsb *r, int high)
1671 struct dlm_lkb *lkb, *s;
1672 int hi, demoted, quit, grant_restart, demote_restart;
1681 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1682 demoted = is_demoted(lkb);
1685 if (can_be_granted(r, lkb, 0, &deadlk)) {
1686 grant_lock_pending(r, lkb);
1691 if (!demoted && is_demoted(lkb)) {
1692 log_print("WARN: pending demoted %x node %d %s",
1693 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1699 log_print("WARN: pending deadlock %x node %d %s",
1700 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1705 hi = max_t(int, lkb->lkb_rqmode, hi);
1710 if (demote_restart && !quit) {
1715 return max_t(int, high, hi);
1718 static int grant_pending_wait(struct dlm_rsb *r, int high)
1720 struct dlm_lkb *lkb, *s;
1722 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1723 if (can_be_granted(r, lkb, 0, NULL))
1724 grant_lock_pending(r, lkb);
1726 high = max_t(int, lkb->lkb_rqmode, high);
1732 static void grant_pending_locks(struct dlm_rsb *r)
1734 struct dlm_lkb *lkb, *s;
1735 int high = DLM_LOCK_IV;
1737 DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1739 high = grant_pending_convert(r, high);
1740 high = grant_pending_wait(r, high);
1742 if (high == DLM_LOCK_IV)
1746 * If there are locks left on the wait/convert queue then send blocking
1747 * ASTs to granted locks based on the largest requested mode (high)
1748 * found above. FIXME: highbast < high comparison not valid for PR/CW.
1751 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1752 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1753 !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1754 queue_bast(r, lkb, high);
1755 lkb->lkb_highbast = high;
1760 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1761 struct dlm_lkb *lkb)
1765 list_for_each_entry(gr, head, lkb_statequeue) {
1766 if (gr->lkb_bastaddr &&
1767 gr->lkb_highbast < lkb->lkb_rqmode &&
1768 !modes_compat(gr, lkb)) {
1769 queue_bast(r, gr, lkb->lkb_rqmode);
1770 gr->lkb_highbast = lkb->lkb_rqmode;
1775 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1777 send_bast_queue(r, &r->res_grantqueue, lkb);
1780 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1782 send_bast_queue(r, &r->res_grantqueue, lkb);
1783 send_bast_queue(r, &r->res_convertqueue, lkb);
1786 /* set_master(r, lkb) -- set the master nodeid of a resource
1788 The purpose of this function is to set the nodeid field in the given
1789 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1790 known, it can just be copied to the lkb and the function will return
1791 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1792 before it can be copied to the lkb.
1794 When the rsb nodeid is being looked up remotely, the initial lkb
1795 causing the lookup is kept on the ls_waiters list waiting for the
1796 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1797 on the rsb's res_lookup list until the master is verified.
1800 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1801 1: the rsb master is not available and the lkb has been placed on
1805 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1807 struct dlm_ls *ls = r->res_ls;
1808 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1810 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1811 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1812 r->res_first_lkid = lkb->lkb_id;
1813 lkb->lkb_nodeid = r->res_nodeid;
1817 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1818 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1822 if (r->res_nodeid == 0) {
1823 lkb->lkb_nodeid = 0;
1827 if (r->res_nodeid > 0) {
1828 lkb->lkb_nodeid = r->res_nodeid;
1832 DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1834 dir_nodeid = dlm_dir_nodeid(r);
1836 if (dir_nodeid != our_nodeid) {
1837 r->res_first_lkid = lkb->lkb_id;
1838 send_lookup(r, lkb);
1843 /* It's possible for dlm_scand to remove an old rsb for
1844 this same resource from the toss list, us to create
1845 a new one, look up the master locally, and find it
1846 already exists just before dlm_scand does the
1847 dir_remove() on the previous rsb. */
1849 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1850 r->res_length, &ret_nodeid);
1853 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1857 if (ret_nodeid == our_nodeid) {
1858 r->res_first_lkid = 0;
1860 lkb->lkb_nodeid = 0;
1862 r->res_first_lkid = lkb->lkb_id;
1863 r->res_nodeid = ret_nodeid;
1864 lkb->lkb_nodeid = ret_nodeid;
1869 static void process_lookup_list(struct dlm_rsb *r)
1871 struct dlm_lkb *lkb, *safe;
1873 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1874 list_del_init(&lkb->lkb_rsb_lookup);
1875 _request_lock(r, lkb);
1880 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1882 static void confirm_master(struct dlm_rsb *r, int error)
1884 struct dlm_lkb *lkb;
1886 if (!r->res_first_lkid)
1892 r->res_first_lkid = 0;
1893 process_lookup_list(r);
1897 /* the remote master didn't queue our NOQUEUE request;
1898 make a waiting lkb the first_lkid */
1900 r->res_first_lkid = 0;
1902 if (!list_empty(&r->res_lookup)) {
1903 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1905 list_del_init(&lkb->lkb_rsb_lookup);
1906 r->res_first_lkid = lkb->lkb_id;
1907 _request_lock(r, lkb);
1913 log_error(r->res_ls, "confirm_master unknown error %d", error);
1917 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1918 int namelen, unsigned long timeout_cs, void *ast,
1919 void *astarg, void *bast, struct dlm_args *args)
1923 /* check for invalid arg usage */
1925 if (mode < 0 || mode > DLM_LOCK_EX)
1928 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1931 if (flags & DLM_LKF_CANCEL)
1934 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1937 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1940 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1943 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1946 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1949 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1952 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1958 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1961 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1964 /* these args will be copied to the lkb in validate_lock_args,
1965 it cannot be done now because when converting locks, fields in
1966 an active lkb cannot be modified before locking the rsb */
1968 args->flags = flags;
1969 args->astaddr = ast;
1970 args->astparam = (long) astarg;
1971 args->bastaddr = bast;
1972 args->timeout = timeout_cs;
1980 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1982 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1983 DLM_LKF_FORCEUNLOCK))
1986 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
1989 args->flags = flags;
1990 args->astparam = (long) astarg;
1994 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1995 struct dlm_args *args)
1999 if (args->flags & DLM_LKF_CONVERT) {
2000 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2003 if (args->flags & DLM_LKF_QUECVT &&
2004 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2008 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2011 if (lkb->lkb_wait_type)
2014 if (is_overlap(lkb))
2018 lkb->lkb_exflags = args->flags;
2019 lkb->lkb_sbflags = 0;
2020 lkb->lkb_astaddr = args->astaddr;
2021 lkb->lkb_astparam = args->astparam;
2022 lkb->lkb_bastaddr = args->bastaddr;
2023 lkb->lkb_rqmode = args->mode;
2024 lkb->lkb_lksb = args->lksb;
2025 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2026 lkb->lkb_ownpid = (int) current->pid;
2027 lkb->lkb_timeout_cs = args->timeout;
2033 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2036 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2037 because there may be a lookup in progress and it's valid to do
2038 cancel/unlockf on it */
2040 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2042 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2045 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2046 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2051 /* an lkb may still exist even though the lock is EOL'ed due to a
2052 cancel, unlock or failed noqueue request; an app can't use these
2053 locks; return same error as if the lkid had not been found at all */
2055 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2056 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2061 /* an lkb may be waiting for an rsb lookup to complete where the
2062 lookup was initiated by another lock */
2064 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2065 if (!list_empty(&lkb->lkb_rsb_lookup)) {
2066 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2067 list_del_init(&lkb->lkb_rsb_lookup);
2068 queue_cast(lkb->lkb_resource, lkb,
2069 args->flags & DLM_LKF_CANCEL ?
2070 -DLM_ECANCEL : -DLM_EUNLOCK);
2071 unhold_lkb(lkb); /* undoes create_lkb() */
2077 /* cancel not allowed with another cancel/unlock in progress */
2079 if (args->flags & DLM_LKF_CANCEL) {
2080 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2083 if (is_overlap(lkb))
2086 /* don't let scand try to do a cancel */
2089 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2090 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2095 switch (lkb->lkb_wait_type) {
2096 case DLM_MSG_LOOKUP:
2097 case DLM_MSG_REQUEST:
2098 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2101 case DLM_MSG_UNLOCK:
2102 case DLM_MSG_CANCEL:
2105 /* add_to_waiters() will set OVERLAP_CANCEL */
2109 /* do we need to allow a force-unlock if there's a normal unlock
2110 already in progress? in what conditions could the normal unlock
2111 fail such that we'd want to send a force-unlock to be sure? */
2113 if (args->flags & DLM_LKF_FORCEUNLOCK) {
2114 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2117 if (is_overlap_unlock(lkb))
2120 /* don't let scand try to do a cancel */
2123 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2124 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2129 switch (lkb->lkb_wait_type) {
2130 case DLM_MSG_LOOKUP:
2131 case DLM_MSG_REQUEST:
2132 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2135 case DLM_MSG_UNLOCK:
2138 /* add_to_waiters() will set OVERLAP_UNLOCK */
2142 /* normal unlock not allowed if there's any op in progress */
2144 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2148 /* an overlapping op shouldn't blow away exflags from other op */
2149 lkb->lkb_exflags |= args->flags;
2150 lkb->lkb_sbflags = 0;
2151 lkb->lkb_astparam = args->astparam;
2155 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2156 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2157 args->flags, lkb->lkb_wait_type,
2158 lkb->lkb_resource->res_name);
2163 * Four stage 4 varieties:
2164 * do_request(), do_convert(), do_unlock(), do_cancel()
2165 * These are called on the master node for the given lock and
2166 * from the central locking logic.
2169 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2173 if (can_be_granted(r, lkb, 1, NULL)) {
2175 queue_cast(r, lkb, 0);
2179 if (can_be_queued(lkb)) {
2180 error = -EINPROGRESS;
2181 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2182 send_blocking_asts(r, lkb);
2188 if (force_blocking_asts(lkb))
2189 send_blocking_asts_all(r, lkb);
2190 queue_cast(r, lkb, -EAGAIN);
2196 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2201 /* changing an existing lock may allow others to be granted */
2203 if (can_be_granted(r, lkb, 1, &deadlk)) {
2205 queue_cast(r, lkb, 0);
2206 grant_pending_locks(r);
2210 /* can_be_granted() detected that this lock would block in a conversion
2211 deadlock, so we leave it on the granted queue and return EDEADLK in
2212 the ast for the convert. */
2215 /* it's left on the granted queue */
2216 log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2217 lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2218 lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2219 revert_lock(r, lkb);
2220 queue_cast(r, lkb, -EDEADLK);
2225 /* is_demoted() means the can_be_granted() above set the grmode
2226 to NL, and left us on the granted queue. This auto-demotion
2227 (due to CONVDEADLK) might mean other locks, and/or this lock, are
2228 now grantable. We have to try to grant other converting locks
2229 before we try again to grant this one. */
2231 if (is_demoted(lkb)) {
2232 grant_pending_convert(r, DLM_LOCK_IV);
2233 if (_can_be_granted(r, lkb, 1)) {
2235 queue_cast(r, lkb, 0);
2236 grant_pending_locks(r);
2239 /* else fall through and move to convert queue */
2242 if (can_be_queued(lkb)) {
2243 error = -EINPROGRESS;
2245 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2246 send_blocking_asts(r, lkb);
2252 if (force_blocking_asts(lkb))
2253 send_blocking_asts_all(r, lkb);
2254 queue_cast(r, lkb, -EAGAIN);
2260 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2262 remove_lock(r, lkb);
2263 queue_cast(r, lkb, -DLM_EUNLOCK);
2264 grant_pending_locks(r);
2265 return -DLM_EUNLOCK;
2268 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2270 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2274 error = revert_lock(r, lkb);
2276 queue_cast(r, lkb, -DLM_ECANCEL);
2277 grant_pending_locks(r);
2278 return -DLM_ECANCEL;
2284 * Four stage 3 varieties:
2285 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2288 /* add a new lkb to a possibly new rsb, called by requesting process */
2290 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2294 /* set_master: sets lkb nodeid from r */
2296 error = set_master(r, lkb);
2305 /* receive_request() calls do_request() on remote node */
2306 error = send_request(r, lkb);
2308 error = do_request(r, lkb);
2313 /* change some property of an existing lkb, e.g. mode */
2315 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2320 /* receive_convert() calls do_convert() on remote node */
2321 error = send_convert(r, lkb);
2323 error = do_convert(r, lkb);
2328 /* remove an existing lkb from the granted queue */
2330 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2335 /* receive_unlock() calls do_unlock() on remote node */
2336 error = send_unlock(r, lkb);
2338 error = do_unlock(r, lkb);
2343 /* remove an existing lkb from the convert or wait queue */
2345 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2350 /* receive_cancel() calls do_cancel() on remote node */
2351 error = send_cancel(r, lkb);
2353 error = do_cancel(r, lkb);
2359 * Four stage 2 varieties:
2360 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2363 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2364 int len, struct dlm_args *args)
2369 error = validate_lock_args(ls, lkb, args);
2373 error = find_rsb(ls, name, len, R_CREATE, &r);
2380 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2382 error = _request_lock(r, lkb);
2391 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2392 struct dlm_args *args)
2397 r = lkb->lkb_resource;
2402 error = validate_lock_args(ls, lkb, args);
2406 error = _convert_lock(r, lkb);
2413 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2414 struct dlm_args *args)
2419 r = lkb->lkb_resource;
2424 error = validate_unlock_args(lkb, args);
2428 error = _unlock_lock(r, lkb);
2435 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2436 struct dlm_args *args)
2441 r = lkb->lkb_resource;
2446 error = validate_unlock_args(lkb, args);
2450 error = _cancel_lock(r, lkb);
2458 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
2461 int dlm_lock(dlm_lockspace_t *lockspace,
2463 struct dlm_lksb *lksb,
2466 unsigned int namelen,
2467 uint32_t parent_lkid,
2468 void (*ast) (void *astarg),
2470 void (*bast) (void *astarg, int mode))
2473 struct dlm_lkb *lkb;
2474 struct dlm_args args;
2475 int error, convert = flags & DLM_LKF_CONVERT;
2477 ls = dlm_find_lockspace_local(lockspace);
2481 dlm_lock_recovery(ls);
2484 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2486 error = create_lkb(ls, &lkb);
2491 error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2492 astarg, bast, &args);
2497 error = convert_lock(ls, lkb, &args);
2499 error = request_lock(ls, lkb, name, namelen, &args);
2501 if (error == -EINPROGRESS)
2504 if (convert || error)
2506 if (error == -EAGAIN || error == -EDEADLK)
2509 dlm_unlock_recovery(ls);
2510 dlm_put_lockspace(ls);
2514 int dlm_unlock(dlm_lockspace_t *lockspace,
2517 struct dlm_lksb *lksb,
2521 struct dlm_lkb *lkb;
2522 struct dlm_args args;
2525 ls = dlm_find_lockspace_local(lockspace);
2529 dlm_lock_recovery(ls);
2531 error = find_lkb(ls, lkid, &lkb);
2535 error = set_unlock_args(flags, astarg, &args);
2539 if (flags & DLM_LKF_CANCEL)
2540 error = cancel_lock(ls, lkb, &args);
2542 error = unlock_lock(ls, lkb, &args);
2544 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2546 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2551 dlm_unlock_recovery(ls);
2552 dlm_put_lockspace(ls);
2557 * send/receive routines for remote operations and replies
2561 * send_request receive_request
2562 * send_convert receive_convert
2563 * send_unlock receive_unlock
2564 * send_cancel receive_cancel
2565 * send_grant receive_grant
2566 * send_bast receive_bast
2567 * send_lookup receive_lookup
2568 * send_remove receive_remove
2571 * receive_request_reply send_request_reply
2572 * receive_convert_reply send_convert_reply
2573 * receive_unlock_reply send_unlock_reply
2574 * receive_cancel_reply send_cancel_reply
2575 * receive_lookup_reply send_lookup_reply
2578 static int _create_message(struct dlm_ls *ls, int mb_len,
2579 int to_nodeid, int mstype,
2580 struct dlm_message **ms_ret,
2581 struct dlm_mhandle **mh_ret)
2583 struct dlm_message *ms;
2584 struct dlm_mhandle *mh;
2587 /* get_buffer gives us a message handle (mh) that we need to
2588 pass into lowcomms_commit and a message buffer (mb) that we
2589 write our data into */
2591 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2595 memset(mb, 0, mb_len);
2597 ms = (struct dlm_message *) mb;
2599 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2600 ms->m_header.h_lockspace = ls->ls_global_id;
2601 ms->m_header.h_nodeid = dlm_our_nodeid();
2602 ms->m_header.h_length = mb_len;
2603 ms->m_header.h_cmd = DLM_MSG;
2605 ms->m_type = mstype;
2612 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2613 int to_nodeid, int mstype,
2614 struct dlm_message **ms_ret,
2615 struct dlm_mhandle **mh_ret)
2617 int mb_len = sizeof(struct dlm_message);
2620 case DLM_MSG_REQUEST:
2621 case DLM_MSG_LOOKUP:
2622 case DLM_MSG_REMOVE:
2623 mb_len += r->res_length;
2625 case DLM_MSG_CONVERT:
2626 case DLM_MSG_UNLOCK:
2627 case DLM_MSG_REQUEST_REPLY:
2628 case DLM_MSG_CONVERT_REPLY:
2630 if (lkb && lkb->lkb_lvbptr)
2631 mb_len += r->res_ls->ls_lvblen;
2635 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2639 /* further lowcomms enhancements or alternate implementations may make
2640 the return value from this function useful at some point */
2642 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2644 dlm_message_out(ms);
2645 dlm_lowcomms_commit_buffer(mh);
2649 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2650 struct dlm_message *ms)
2652 ms->m_nodeid = lkb->lkb_nodeid;
2653 ms->m_pid = lkb->lkb_ownpid;
2654 ms->m_lkid = lkb->lkb_id;
2655 ms->m_remid = lkb->lkb_remid;
2656 ms->m_exflags = lkb->lkb_exflags;
2657 ms->m_sbflags = lkb->lkb_sbflags;
2658 ms->m_flags = lkb->lkb_flags;
2659 ms->m_lvbseq = lkb->lkb_lvbseq;
2660 ms->m_status = lkb->lkb_status;
2661 ms->m_grmode = lkb->lkb_grmode;
2662 ms->m_rqmode = lkb->lkb_rqmode;
2663 ms->m_hash = r->res_hash;
2665 /* m_result and m_bastmode are set from function args,
2666 not from lkb fields */
2668 if (lkb->lkb_bastaddr)
2669 ms->m_asts |= AST_BAST;
2670 if (lkb->lkb_astaddr)
2671 ms->m_asts |= AST_COMP;
2673 /* compare with switch in create_message; send_remove() doesn't
2676 switch (ms->m_type) {
2677 case DLM_MSG_REQUEST:
2678 case DLM_MSG_LOOKUP:
2679 memcpy(ms->m_extra, r->res_name, r->res_length);
2681 case DLM_MSG_CONVERT:
2682 case DLM_MSG_UNLOCK:
2683 case DLM_MSG_REQUEST_REPLY:
2684 case DLM_MSG_CONVERT_REPLY:
2686 if (!lkb->lkb_lvbptr)
2688 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2693 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2695 struct dlm_message *ms;
2696 struct dlm_mhandle *mh;
2697 int to_nodeid, error;
2699 error = add_to_waiters(lkb, mstype);
2703 to_nodeid = r->res_nodeid;
2705 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2709 send_args(r, lkb, ms);
2711 error = send_message(mh, ms);
2717 remove_from_waiters(lkb, msg_reply_type(mstype));
2721 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2723 return send_common(r, lkb, DLM_MSG_REQUEST);
2726 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2730 error = send_common(r, lkb, DLM_MSG_CONVERT);
2732 /* down conversions go without a reply from the master */
2733 if (!error && down_conversion(lkb)) {
2734 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2735 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2736 r->res_ls->ls_stub_ms.m_result = 0;
2737 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2738 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2744 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2745 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2746 that the master is still correct. */
2748 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2750 return send_common(r, lkb, DLM_MSG_UNLOCK);
2753 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2755 return send_common(r, lkb, DLM_MSG_CANCEL);
2758 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2760 struct dlm_message *ms;
2761 struct dlm_mhandle *mh;
2762 int to_nodeid, error;
2764 to_nodeid = lkb->lkb_nodeid;
2766 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2770 send_args(r, lkb, ms);
2774 error = send_message(mh, ms);
2779 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2781 struct dlm_message *ms;
2782 struct dlm_mhandle *mh;
2783 int to_nodeid, error;
2785 to_nodeid = lkb->lkb_nodeid;
2787 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2791 send_args(r, lkb, ms);
2793 ms->m_bastmode = mode;
2795 error = send_message(mh, ms);
2800 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2802 struct dlm_message *ms;
2803 struct dlm_mhandle *mh;
2804 int to_nodeid, error;
2806 error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2810 to_nodeid = dlm_dir_nodeid(r);
2812 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2816 send_args(r, lkb, ms);
2818 error = send_message(mh, ms);
2824 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2828 static int send_remove(struct dlm_rsb *r)
2830 struct dlm_message *ms;
2831 struct dlm_mhandle *mh;
2832 int to_nodeid, error;
2834 to_nodeid = dlm_dir_nodeid(r);
2836 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2840 memcpy(ms->m_extra, r->res_name, r->res_length);
2841 ms->m_hash = r->res_hash;
2843 error = send_message(mh, ms);
2848 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2851 struct dlm_message *ms;
2852 struct dlm_mhandle *mh;
2853 int to_nodeid, error;
2855 to_nodeid = lkb->lkb_nodeid;
2857 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2861 send_args(r, lkb, ms);
2865 error = send_message(mh, ms);
2870 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2872 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2875 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2877 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2880 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2882 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2885 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2887 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2890 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2891 int ret_nodeid, int rv)
2893 struct dlm_rsb *r = &ls->ls_stub_rsb;
2894 struct dlm_message *ms;
2895 struct dlm_mhandle *mh;
2896 int error, nodeid = ms_in->m_header.h_nodeid;
2898 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2902 ms->m_lkid = ms_in->m_lkid;
2904 ms->m_nodeid = ret_nodeid;
2906 error = send_message(mh, ms);
2911 /* which args we save from a received message depends heavily on the type
2912 of message, unlike the send side where we can safely send everything about
2913 the lkb for any type of message */
2915 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2917 lkb->lkb_exflags = ms->m_exflags;
2918 lkb->lkb_sbflags = ms->m_sbflags;
2919 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2920 (ms->m_flags & 0x0000FFFF);
2923 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2925 lkb->lkb_sbflags = ms->m_sbflags;
2926 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2927 (ms->m_flags & 0x0000FFFF);
2930 static int receive_extralen(struct dlm_message *ms)
2932 return (ms->m_header.h_length - sizeof(struct dlm_message));
2935 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2936 struct dlm_message *ms)
2940 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2941 if (!lkb->lkb_lvbptr)
2942 lkb->lkb_lvbptr = allocate_lvb(ls);
2943 if (!lkb->lkb_lvbptr)
2945 len = receive_extralen(ms);
2946 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2951 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2952 struct dlm_message *ms)
2954 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2955 lkb->lkb_ownpid = ms->m_pid;
2956 lkb->lkb_remid = ms->m_lkid;
2957 lkb->lkb_grmode = DLM_LOCK_IV;
2958 lkb->lkb_rqmode = ms->m_rqmode;
2959 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2960 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2962 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2964 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2965 /* lkb was just created so there won't be an lvb yet */
2966 lkb->lkb_lvbptr = allocate_lvb(ls);
2967 if (!lkb->lkb_lvbptr)
2974 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2975 struct dlm_message *ms)
2977 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2978 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2979 lkb->lkb_nodeid, ms->m_header.h_nodeid,
2980 lkb->lkb_id, lkb->lkb_remid);
2984 if (!is_master_copy(lkb))
2987 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2990 if (receive_lvb(ls, lkb, ms))
2993 lkb->lkb_rqmode = ms->m_rqmode;
2994 lkb->lkb_lvbseq = ms->m_lvbseq;
2999 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3000 struct dlm_message *ms)
3002 if (!is_master_copy(lkb))
3004 if (receive_lvb(ls, lkb, ms))
3009 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3010 uses to send a reply and that the remote end uses to process the reply. */
3012 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3014 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3015 lkb->lkb_nodeid = ms->m_header.h_nodeid;
3016 lkb->lkb_remid = ms->m_lkid;
3019 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3021 struct dlm_lkb *lkb;
3025 error = create_lkb(ls, &lkb);
3029 receive_flags(lkb, ms);
3030 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3031 error = receive_request_args(ls, lkb, ms);
3037 namelen = receive_extralen(ms);
3039 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3048 error = do_request(r, lkb);
3049 send_request_reply(r, lkb, error);
3054 if (error == -EINPROGRESS)
3061 setup_stub_lkb(ls, ms);
3062 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3065 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3067 struct dlm_lkb *lkb;
3069 int error, reply = 1;
3071 error = find_lkb(ls, ms->m_remid, &lkb);
3075 r = lkb->lkb_resource;
3080 receive_flags(lkb, ms);
3081 error = receive_convert_args(ls, lkb, ms);
3084 reply = !down_conversion(lkb);
3086 error = do_convert(r, lkb);
3089 send_convert_reply(r, lkb, error);
3097 setup_stub_lkb(ls, ms);
3098 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3101 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3103 struct dlm_lkb *lkb;
3107 error = find_lkb(ls, ms->m_remid, &lkb);
3111 r = lkb->lkb_resource;
3116 receive_flags(lkb, ms);
3117 error = receive_unlock_args(ls, lkb, ms);
3121 error = do_unlock(r, lkb);
3123 send_unlock_reply(r, lkb, error);
3131 setup_stub_lkb(ls, ms);
3132 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3135 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3137 struct dlm_lkb *lkb;
3141 error = find_lkb(ls, ms->m_remid, &lkb);
3145 receive_flags(lkb, ms);
3147 r = lkb->lkb_resource;
3152 error = do_cancel(r, lkb);
3153 send_cancel_reply(r, lkb, error);
3161 setup_stub_lkb(ls, ms);
3162 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3165 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3167 struct dlm_lkb *lkb;
3171 error = find_lkb(ls, ms->m_remid, &lkb);
3173 log_error(ls, "receive_grant no lkb");
3176 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3178 r = lkb->lkb_resource;
3183 receive_flags_reply(lkb, ms);
3184 if (is_altmode(lkb))
3185 munge_altmode(lkb, ms);
3186 grant_lock_pc(r, lkb, ms);
3187 queue_cast(r, lkb, 0);
3194 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3196 struct dlm_lkb *lkb;
3200 error = find_lkb(ls, ms->m_remid, &lkb);
3202 log_error(ls, "receive_bast no lkb");
3205 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3207 r = lkb->lkb_resource;
3212 queue_bast(r, lkb, ms->m_bastmode);
3219 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3221 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3223 from_nodeid = ms->m_header.h_nodeid;
3224 our_nodeid = dlm_our_nodeid();
3226 len = receive_extralen(ms);
3228 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3229 if (dir_nodeid != our_nodeid) {
3230 log_error(ls, "lookup dir_nodeid %d from %d",
3231 dir_nodeid, from_nodeid);
3237 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3239 /* Optimization: we're master so treat lookup as a request */
3240 if (!error && ret_nodeid == our_nodeid) {
3241 receive_request(ls, ms);
3245 send_lookup_reply(ls, ms, ret_nodeid, error);
3248 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3250 int len, dir_nodeid, from_nodeid;
3252 from_nodeid = ms->m_header.h_nodeid;
3254 len = receive_extralen(ms);
3256 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3257 if (dir_nodeid != dlm_our_nodeid()) {
3258 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3259 dir_nodeid, from_nodeid);
3263 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3266 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3268 do_purge(ls, ms->m_nodeid, ms->m_pid);
3271 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3273 struct dlm_lkb *lkb;
3275 int error, mstype, result;
3277 error = find_lkb(ls, ms->m_remid, &lkb);
3279 log_error(ls, "receive_request_reply no lkb");
3282 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3284 r = lkb->lkb_resource;
3288 mstype = lkb->lkb_wait_type;
3289 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3293 /* Optimization: the dir node was also the master, so it took our
3294 lookup as a request and sent request reply instead of lookup reply */
3295 if (mstype == DLM_MSG_LOOKUP) {
3296 r->res_nodeid = ms->m_header.h_nodeid;
3297 lkb->lkb_nodeid = r->res_nodeid;
3300 /* this is the value returned from do_request() on the master */
3301 result = ms->m_result;
3305 /* request would block (be queued) on remote master */
3306 queue_cast(r, lkb, -EAGAIN);
3307 confirm_master(r, -EAGAIN);
3308 unhold_lkb(lkb); /* undoes create_lkb() */
3313 /* request was queued or granted on remote master */
3314 receive_flags_reply(lkb, ms);
3315 lkb->lkb_remid = ms->m_lkid;
3316 if (is_altmode(lkb))
3317 munge_altmode(lkb, ms);
3319 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3322 grant_lock_pc(r, lkb, ms);
3323 queue_cast(r, lkb, 0);
3325 confirm_master(r, result);
3330 /* find_rsb failed to find rsb or rsb wasn't master */
3331 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3332 lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3334 lkb->lkb_nodeid = -1;
3336 if (is_overlap(lkb)) {
3337 /* we'll ignore error in cancel/unlock reply */
3338 queue_cast_overlap(r, lkb);
3339 unhold_lkb(lkb); /* undoes create_lkb() */
3341 _request_lock(r, lkb);
3345 log_error(ls, "receive_request_reply %x error %d",
3346 lkb->lkb_id, result);
3349 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3350 log_debug(ls, "receive_request_reply %x result %d unlock",
3351 lkb->lkb_id, result);
3352 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3353 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3354 send_unlock(r, lkb);
3355 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3356 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3357 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3358 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3359 send_cancel(r, lkb);
3361 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3362 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3370 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3371 struct dlm_message *ms)
3373 /* this is the value returned from do_convert() on the master */
3374 switch (ms->m_result) {
3376 /* convert would block (be queued) on remote master */
3377 queue_cast(r, lkb, -EAGAIN);
3381 receive_flags_reply(lkb, ms);
3382 revert_lock_pc(r, lkb);
3383 queue_cast(r, lkb, -EDEADLK);
3387 /* convert was queued on remote master */
3388 receive_flags_reply(lkb, ms);
3389 if (is_demoted(lkb))
3390 munge_demoted(lkb, ms);
3392 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3397 /* convert was granted on remote master */
3398 receive_flags_reply(lkb, ms);
3399 if (is_demoted(lkb))
3400 munge_demoted(lkb, ms);
3401 grant_lock_pc(r, lkb, ms);
3402 queue_cast(r, lkb, 0);
3406 log_error(r->res_ls, "receive_convert_reply %x error %d",
3407 lkb->lkb_id, ms->m_result);
3411 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3413 struct dlm_rsb *r = lkb->lkb_resource;
3419 /* stub reply can happen with waiters_mutex held */
3420 error = remove_from_waiters_ms(lkb, ms);
3424 __receive_convert_reply(r, lkb, ms);
3430 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3432 struct dlm_lkb *lkb;
3435 error = find_lkb(ls, ms->m_remid, &lkb);
3437 log_error(ls, "receive_convert_reply no lkb");
3440 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3442 _receive_convert_reply(lkb, ms);
3446 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3448 struct dlm_rsb *r = lkb->lkb_resource;
3454 /* stub reply can happen with waiters_mutex held */
3455 error = remove_from_waiters_ms(lkb, ms);
3459 /* this is the value returned from do_unlock() on the master */
3461 switch (ms->m_result) {
3463 receive_flags_reply(lkb, ms);
3464 remove_lock_pc(r, lkb);
3465 queue_cast(r, lkb, -DLM_EUNLOCK);
3470 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3471 lkb->lkb_id, ms->m_result);
3478 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3480 struct dlm_lkb *lkb;
3483 error = find_lkb(ls, ms->m_remid, &lkb);
3485 log_error(ls, "receive_unlock_reply no lkb");
3488 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3490 _receive_unlock_reply(lkb, ms);
3494 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3496 struct dlm_rsb *r = lkb->lkb_resource;
3502 /* stub reply can happen with waiters_mutex held */
3503 error = remove_from_waiters_ms(lkb, ms);
3507 /* this is the value returned from do_cancel() on the master */
3509 switch (ms->m_result) {
3511 receive_flags_reply(lkb, ms);
3512 revert_lock_pc(r, lkb);
3514 queue_cast(r, lkb, -DLM_ECANCEL);
3519 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3520 lkb->lkb_id, ms->m_result);
3527 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3529 struct dlm_lkb *lkb;
3532 error = find_lkb(ls, ms->m_remid, &lkb);
3534 log_error(ls, "receive_cancel_reply no lkb");
3537 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3539 _receive_cancel_reply(lkb, ms);
3543 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3545 struct dlm_lkb *lkb;
3547 int error, ret_nodeid;
3549 error = find_lkb(ls, ms->m_lkid, &lkb);
3551 log_error(ls, "receive_lookup_reply no lkb");
3555 /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3556 FIXME: will a non-zero error ever be returned? */
3558 r = lkb->lkb_resource;
3562 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3566 ret_nodeid = ms->m_nodeid;
3567 if (ret_nodeid == dlm_our_nodeid()) {
3570 r->res_first_lkid = 0;
3572 /* set_master() will copy res_nodeid to lkb_nodeid */
3573 r->res_nodeid = ret_nodeid;
3576 if (is_overlap(lkb)) {
3577 log_debug(ls, "receive_lookup_reply %x unlock %x",
3578 lkb->lkb_id, lkb->lkb_flags);
3579 queue_cast_overlap(r, lkb);
3580 unhold_lkb(lkb); /* undoes create_lkb() */
3584 _request_lock(r, lkb);
3588 process_lookup_list(r);
3595 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3597 struct dlm_message *ms = (struct dlm_message *) hd;
3604 ls = dlm_find_lockspace_global(hd->h_lockspace);
3606 log_print("drop message %d from %d for unknown lockspace %d",
3607 ms->m_type, nodeid, hd->h_lockspace);
3611 /* recovery may have just ended leaving a bunch of backed-up requests
3612 in the requestqueue; wait while dlm_recoverd clears them */
3615 dlm_wait_requestqueue(ls);
3617 /* recovery may have just started while there were a bunch of
3618 in-flight requests -- save them in requestqueue to be processed
3619 after recovery. we can't let dlm_recvd block on the recovery
3620 lock. if dlm_recoverd is calling this function to clear the
3621 requestqueue, it needs to be interrupted (-EINTR) if another
3622 recovery operation is starting. */
3625 if (dlm_locking_stopped(ls)) {
3630 error = dlm_add_requestqueue(ls, nodeid, hd);
3631 if (error == -EAGAIN)
3639 if (dlm_lock_recovery_try(ls))
3644 switch (ms->m_type) {
3646 /* messages sent to a master node */
3648 case DLM_MSG_REQUEST:
3649 receive_request(ls, ms);
3652 case DLM_MSG_CONVERT:
3653 receive_convert(ls, ms);
3656 case DLM_MSG_UNLOCK:
3657 receive_unlock(ls, ms);
3660 case DLM_MSG_CANCEL:
3661 receive_cancel(ls, ms);
3664 /* messages sent from a master node (replies to above) */
3666 case DLM_MSG_REQUEST_REPLY:
3667 receive_request_reply(ls, ms);
3670 case DLM_MSG_CONVERT_REPLY:
3671 receive_convert_reply(ls, ms);
3674 case DLM_MSG_UNLOCK_REPLY:
3675 receive_unlock_reply(ls, ms);
3678 case DLM_MSG_CANCEL_REPLY:
3679 receive_cancel_reply(ls, ms);
3682 /* messages sent from a master node (only two types of async msg) */
3685 receive_grant(ls, ms);
3689 receive_bast(ls, ms);
3692 /* messages sent to a dir node */
3694 case DLM_MSG_LOOKUP:
3695 receive_lookup(ls, ms);
3698 case DLM_MSG_REMOVE:
3699 receive_remove(ls, ms);
3702 /* messages sent from a dir node (remove has no reply) */
3704 case DLM_MSG_LOOKUP_REPLY:
3705 receive_lookup_reply(ls, ms);
3708 /* other messages */
3711 receive_purge(ls, ms);
3715 log_error(ls, "unknown message type %d", ms->m_type);
3718 dlm_unlock_recovery(ls);
3720 dlm_put_lockspace(ls);
3730 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3732 if (middle_conversion(lkb)) {
3734 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3735 ls->ls_stub_ms.m_result = -EINPROGRESS;
3736 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3737 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3739 /* Same special case as in receive_rcom_lock_args() */
3740 lkb->lkb_grmode = DLM_LOCK_IV;
3741 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3744 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3745 lkb->lkb_flags |= DLM_IFL_RESEND;
3748 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3749 conversions are async; there's no reply from the remote master */
3752 /* A waiting lkb needs recovery if the master node has failed, or
3753 the master node is changing (only when no directory is used) */
3755 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3757 if (dlm_is_removed(ls, lkb->lkb_nodeid))
3760 if (!dlm_no_directory(ls))
3763 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3769 /* Recovery for locks that are waiting for replies from nodes that are now
3770 gone. We can just complete unlocks and cancels by faking a reply from the
3771 dead node. Requests and up-conversions we flag to be resent after
3772 recovery. Down-conversions can just be completed with a fake reply like
3773 unlocks. Conversions between PR and CW need special attention. */
3775 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3777 struct dlm_lkb *lkb, *safe;
3779 mutex_lock(&ls->ls_waiters_mutex);
3781 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3782 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3783 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3785 /* all outstanding lookups, regardless of destination will be
3786 resent after recovery is done */
3788 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3789 lkb->lkb_flags |= DLM_IFL_RESEND;
3793 if (!waiter_needs_recovery(ls, lkb))
3796 switch (lkb->lkb_wait_type) {
3798 case DLM_MSG_REQUEST:
3799 lkb->lkb_flags |= DLM_IFL_RESEND;
3802 case DLM_MSG_CONVERT:
3803 recover_convert_waiter(ls, lkb);
3806 case DLM_MSG_UNLOCK:
3808 ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
3809 ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3810 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3811 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3815 case DLM_MSG_CANCEL:
3817 ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
3818 ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3819 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3820 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3825 log_error(ls, "invalid lkb wait_type %d",
3826 lkb->lkb_wait_type);
3830 mutex_unlock(&ls->ls_waiters_mutex);
3833 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
3835 struct dlm_lkb *lkb;
3838 mutex_lock(&ls->ls_waiters_mutex);
3839 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3840 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3846 mutex_unlock(&ls->ls_waiters_mutex);
3853 /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3854 master or dir-node for r. Processing the lkb may result in it being placed
3857 /* We do this after normal locking has been enabled and any saved messages
3858 (in requestqueue) have been processed. We should be confident that at
3859 this point we won't get or process a reply to any of these waiting
3860 operations. But, new ops may be coming in on the rsbs/locks here from
3861 userspace or remotely. */
3863 /* there may have been an overlap unlock/cancel prior to recovery or after
3864 recovery. if before, the lkb may still have a pos wait_count; if after, the
3865 overlap flag would just have been set and nothing new sent. we can be
3866 confident here than any replies to either the initial op or overlap ops
3867 prior to recovery have been received. */
3869 int dlm_recover_waiters_post(struct dlm_ls *ls)
3871 struct dlm_lkb *lkb;
3873 int error = 0, mstype, err, oc, ou;
3876 if (dlm_locking_stopped(ls)) {
3877 log_debug(ls, "recover_waiters_post aborted");
3882 lkb = find_resend_waiter(ls);
3886 r = lkb->lkb_resource;
3890 mstype = lkb->lkb_wait_type;
3891 oc = is_overlap_cancel(lkb);
3892 ou = is_overlap_unlock(lkb);
3895 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3896 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3898 /* At this point we assume that we won't get a reply to any
3899 previous op or overlap op on this lock. First, do a big
3900 remove_from_waiters() for all previous ops. */
3902 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3903 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3904 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3905 lkb->lkb_wait_type = 0;
3906 lkb->lkb_wait_count = 0;
3907 mutex_lock(&ls->ls_waiters_mutex);
3908 list_del_init(&lkb->lkb_wait_reply);
3909 mutex_unlock(&ls->ls_waiters_mutex);
3910 unhold_lkb(lkb); /* for waiters list */
3913 /* do an unlock or cancel instead of resending */
3915 case DLM_MSG_LOOKUP:
3916 case DLM_MSG_REQUEST:
3917 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
3919 unhold_lkb(lkb); /* undoes create_lkb() */
3921 case DLM_MSG_CONVERT:
3923 queue_cast(r, lkb, -DLM_ECANCEL);
3925 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
3926 _unlock_lock(r, lkb);
3934 case DLM_MSG_LOOKUP:
3935 case DLM_MSG_REQUEST:
3936 _request_lock(r, lkb);
3938 confirm_master(r, 0);
3940 case DLM_MSG_CONVERT:
3941 _convert_lock(r, lkb);
3949 log_error(ls, "recover_waiters_post %x %d %x %d %d",
3950 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
3959 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3960 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3962 struct dlm_ls *ls = r->res_ls;
3963 struct dlm_lkb *lkb, *safe;
3965 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3966 if (test(ls, lkb)) {
3967 rsb_set_flag(r, RSB_LOCKS_PURGED);
3969 /* this put should free the lkb */
3970 if (!dlm_put_lkb(lkb))
3971 log_error(ls, "purged lkb not released");
3976 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3978 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3981 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3983 return is_master_copy(lkb);
3986 static void purge_dead_locks(struct dlm_rsb *r)
3988 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3989 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3990 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3993 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3995 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3996 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3997 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4000 /* Get rid of locks held by nodes that are gone. */
4002 int dlm_purge_locks(struct dlm_ls *ls)
4006 log_debug(ls, "dlm_purge_locks");
4008 down_write(&ls->ls_root_sem);
4009 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4013 purge_dead_locks(r);
4019 up_write(&ls->ls_root_sem);
4024 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4026 struct dlm_rsb *r, *r_ret = NULL;
4028 read_lock(&ls->ls_rsbtbl[bucket].lock);
4029 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4030 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4033 rsb_clear_flag(r, RSB_LOCKS_PURGED);
4037 read_unlock(&ls->ls_rsbtbl[bucket].lock);
4041 void dlm_grant_after_purge(struct dlm_ls *ls)
4047 r = find_purged_rsb(ls, bucket);
4049 if (bucket == ls->ls_rsbtbl_size - 1)
4056 grant_pending_locks(r);
4057 confirm_master(r, 0);
4065 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4068 struct dlm_lkb *lkb;
4070 list_for_each_entry(lkb, head, lkb_statequeue) {
4071 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4077 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4080 struct dlm_lkb *lkb;
4082 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4085 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4088 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4094 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4095 struct dlm_rsb *r, struct dlm_rcom *rc)
4097 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4100 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4101 lkb->lkb_ownpid = rl->rl_ownpid;
4102 lkb->lkb_remid = rl->rl_lkid;
4103 lkb->lkb_exflags = rl->rl_exflags;
4104 lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
4105 lkb->lkb_flags |= DLM_IFL_MSTCPY;
4106 lkb->lkb_lvbseq = rl->rl_lvbseq;
4107 lkb->lkb_rqmode = rl->rl_rqmode;
4108 lkb->lkb_grmode = rl->rl_grmode;
4109 /* don't set lkb_status because add_lkb wants to itself */
4111 lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
4112 lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
4114 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4115 lkb->lkb_lvbptr = allocate_lvb(ls);
4116 if (!lkb->lkb_lvbptr)
4118 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4119 sizeof(struct rcom_lock);
4120 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4123 /* Conversions between PR and CW (middle modes) need special handling.
4124 The real granted mode of these converting locks cannot be determined
4125 until all locks have been rebuilt on the rsb (recover_conversion) */
4127 if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
4128 rl->rl_status = DLM_LKSTS_CONVERT;
4129 lkb->lkb_grmode = DLM_LOCK_IV;
4130 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4136 /* This lkb may have been recovered in a previous aborted recovery so we need
4137 to check if the rsb already has an lkb with the given remote nodeid/lkid.
4138 If so we just send back a standard reply. If not, we create a new lkb with
4139 the given values and send back our lkid. We send back our lkid by sending
4140 back the rcom_lock struct we got but with the remid field filled in. */
4142 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4144 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4146 struct dlm_lkb *lkb;
4149 if (rl->rl_parent_lkid) {
4150 error = -EOPNOTSUPP;
4154 error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
4160 lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
4166 error = create_lkb(ls, &lkb);
4170 error = receive_rcom_lock_args(ls, lkb, r, rc);
4177 add_lkb(r, lkb, rl->rl_status);
4181 /* this is the new value returned to the lock holder for
4182 saving in its process-copy lkb */
4183 rl->rl_remid = lkb->lkb_id;
4190 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
4191 rl->rl_result = error;
4195 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4197 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4199 struct dlm_lkb *lkb;
4202 error = find_lkb(ls, rl->rl_lkid, &lkb);
4204 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
4208 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4210 error = rl->rl_result;
4212 r = lkb->lkb_resource;
4218 /* There's a chance the new master received our lock before
4219 dlm_recover_master_reply(), this wouldn't happen if we did
4220 a barrier between recover_masters and recover_locks. */
4221 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4222 (unsigned long)r, r->res_name);
4223 dlm_send_rcom_lock(r, lkb);
4226 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4229 lkb->lkb_remid = rl->rl_remid;
4232 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4233 error, lkb->lkb_id);
4236 /* an ack for dlm_recover_locks() which waits for replies from
4237 all the locks it sends to new masters */
4238 dlm_recovered_lock(r);
4247 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4248 int mode, uint32_t flags, void *name, unsigned int namelen,
4249 unsigned long timeout_cs)
4251 struct dlm_lkb *lkb;
4252 struct dlm_args args;
4255 dlm_lock_recovery(ls);
4257 error = create_lkb(ls, &lkb);
4263 if (flags & DLM_LKF_VALBLK) {
4264 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4265 if (!ua->lksb.sb_lvbptr) {
4273 /* After ua is attached to lkb it will be freed by free_lkb().
4274 When DLM_IFL_USER is set, the dlm knows that this is a userspace
4275 lock and that lkb_astparam is the dlm_user_args structure. */
4277 error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4278 DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
4279 lkb->lkb_flags |= DLM_IFL_USER;
4280 ua->old_mode = DLM_LOCK_IV;
4287 error = request_lock(ls, lkb, name, namelen, &args);
4303 /* add this new lkb to the per-process list of locks */
4304 spin_lock(&ua->proc->locks_spin);
4306 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4307 spin_unlock(&ua->proc->locks_spin);
4309 dlm_unlock_recovery(ls);
4313 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4314 int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4315 unsigned long timeout_cs)
4317 struct dlm_lkb *lkb;
4318 struct dlm_args args;
4319 struct dlm_user_args *ua;
4322 dlm_lock_recovery(ls);
4324 error = find_lkb(ls, lkid, &lkb);
4328 /* user can change the params on its lock when it converts it, or
4329 add an lvb that didn't exist before */
4331 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4333 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4334 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4335 if (!ua->lksb.sb_lvbptr) {
4340 if (lvb_in && ua->lksb.sb_lvbptr)
4341 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4343 ua->xid = ua_tmp->xid;
4344 ua->castparam = ua_tmp->castparam;
4345 ua->castaddr = ua_tmp->castaddr;
4346 ua->bastparam = ua_tmp->bastparam;
4347 ua->bastaddr = ua_tmp->bastaddr;
4348 ua->user_lksb = ua_tmp->user_lksb;
4349 ua->old_mode = lkb->lkb_grmode;
4351 error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4352 DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
4356 error = convert_lock(ls, lkb, &args);
4358 if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4363 dlm_unlock_recovery(ls);
4368 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4369 uint32_t flags, uint32_t lkid, char *lvb_in)
4371 struct dlm_lkb *lkb;
4372 struct dlm_args args;
4373 struct dlm_user_args *ua;
4376 dlm_lock_recovery(ls);
4378 error = find_lkb(ls, lkid, &lkb);
4382 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4384 if (lvb_in && ua->lksb.sb_lvbptr)
4385 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4386 ua->castparam = ua_tmp->castparam;
4387 ua->user_lksb = ua_tmp->user_lksb;
4389 error = set_unlock_args(flags, ua, &args);
4393 error = unlock_lock(ls, lkb, &args);
4395 if (error == -DLM_EUNLOCK)
4397 /* from validate_unlock_args() */
4398 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4403 spin_lock(&ua->proc->locks_spin);
4404 /* dlm_user_add_ast() may have already taken lkb off the proc list */
4405 if (!list_empty(&lkb->lkb_ownqueue))
4406 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4407 spin_unlock(&ua->proc->locks_spin);
4411 dlm_unlock_recovery(ls);
4416 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4417 uint32_t flags, uint32_t lkid)
4419 struct dlm_lkb *lkb;
4420 struct dlm_args args;
4421 struct dlm_user_args *ua;
4424 dlm_lock_recovery(ls);
4426 error = find_lkb(ls, lkid, &lkb);
4430 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4431 ua->castparam = ua_tmp->castparam;
4432 ua->user_lksb = ua_tmp->user_lksb;
4434 error = set_unlock_args(flags, ua, &args);
4438 error = cancel_lock(ls, lkb, &args);
4440 if (error == -DLM_ECANCEL)
4442 /* from validate_unlock_args() */
4443 if (error == -EBUSY)
4448 dlm_unlock_recovery(ls);
4453 /* lkb's that are removed from the waiters list by revert are just left on the
4454 orphans list with the granted orphan locks, to be freed by purge */
4456 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4458 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4459 struct dlm_args args;
4463 mutex_lock(&ls->ls_orphans_mutex);
4464 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4465 mutex_unlock(&ls->ls_orphans_mutex);
4467 set_unlock_args(0, ua, &args);
4469 error = cancel_lock(ls, lkb, &args);
4470 if (error == -DLM_ECANCEL)
4475 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4476 Regardless of what rsb queue the lock is on, it's removed and freed. */
4478 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4480 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4481 struct dlm_args args;
4484 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
4486 error = unlock_lock(ls, lkb, &args);
4487 if (error == -DLM_EUNLOCK)
4492 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4493 (which does lock_rsb) due to deadlock with receiving a message that does
4494 lock_rsb followed by dlm_user_add_ast() */
4496 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4497 struct dlm_user_proc *proc)
4499 struct dlm_lkb *lkb = NULL;
4501 mutex_lock(&ls->ls_clear_proc_locks);
4502 if (list_empty(&proc->locks))
4505 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4506 list_del_init(&lkb->lkb_ownqueue);
4508 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4509 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4511 lkb->lkb_flags |= DLM_IFL_DEAD;
4513 mutex_unlock(&ls->ls_clear_proc_locks);
4517 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4518 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4519 which we clear here. */
4521 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4522 list, and no more device_writes should add lkb's to proc->locks list; so we
4523 shouldn't need to take asts_spin or locks_spin here. this assumes that
4524 device reads/writes/closes are serialized -- FIXME: we may need to serialize
4527 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4529 struct dlm_lkb *lkb, *safe;
4531 dlm_lock_recovery(ls);
4534 lkb = del_proc_lock(ls, proc);
4537 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4538 orphan_proc_lock(ls, lkb);
4540 unlock_proc_lock(ls, lkb);
4542 /* this removes the reference for the proc->locks list
4543 added by dlm_user_request, it may result in the lkb
4549 mutex_lock(&ls->ls_clear_proc_locks);
4551 /* in-progress unlocks */
4552 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4553 list_del_init(&lkb->lkb_ownqueue);
4554 lkb->lkb_flags |= DLM_IFL_DEAD;
4558 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4559 list_del(&lkb->lkb_astqueue);
4563 mutex_unlock(&ls->ls_clear_proc_locks);
4564 dlm_unlock_recovery(ls);
4567 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4569 struct dlm_lkb *lkb, *safe;
4573 spin_lock(&proc->locks_spin);
4574 if (!list_empty(&proc->locks)) {
4575 lkb = list_entry(proc->locks.next, struct dlm_lkb,
4577 list_del_init(&lkb->lkb_ownqueue);
4579 spin_unlock(&proc->locks_spin);
4584 lkb->lkb_flags |= DLM_IFL_DEAD;
4585 unlock_proc_lock(ls, lkb);
4586 dlm_put_lkb(lkb); /* ref from proc->locks list */
4589 spin_lock(&proc->locks_spin);
4590 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4591 list_del_init(&lkb->lkb_ownqueue);
4592 lkb->lkb_flags |= DLM_IFL_DEAD;
4595 spin_unlock(&proc->locks_spin);
4597 spin_lock(&proc->asts_spin);
4598 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4599 list_del(&lkb->lkb_astqueue);
4602 spin_unlock(&proc->asts_spin);
4605 /* pid of 0 means purge all orphans */
4607 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4609 struct dlm_lkb *lkb, *safe;
4611 mutex_lock(&ls->ls_orphans_mutex);
4612 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4613 if (pid && lkb->lkb_ownpid != pid)
4615 unlock_proc_lock(ls, lkb);
4616 list_del_init(&lkb->lkb_ownqueue);
4619 mutex_unlock(&ls->ls_orphans_mutex);
4622 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4624 struct dlm_message *ms;
4625 struct dlm_mhandle *mh;
4628 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4629 DLM_MSG_PURGE, &ms, &mh);
4632 ms->m_nodeid = nodeid;
4635 return send_message(mh, ms);
4638 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4639 int nodeid, int pid)
4643 if (nodeid != dlm_our_nodeid()) {
4644 error = send_purge(ls, nodeid, pid);
4646 dlm_lock_recovery(ls);
4647 if (pid == current->pid)
4648 purge_proc_locks(ls, proc);
4650 do_purge(ls, nodeid, pid);
4651 dlm_unlock_recovery(ls);