1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) 2005 Red Hat, Inc. All rights reserved.
6 ** This copyrighted material is made available to anyone wishing to use,
7 ** modify, copy, or redistribute it subject to the terms and conditions
8 ** of the GNU General Public License v.2.
10 *******************************************************************************
11 ******************************************************************************/
13 /* Central locking logic has four stages:
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
54 L: send_xxxx() -> R: receive_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
63 #include "requestqueue.h"
67 #include "lockspace.h"
72 #include "lvb_table.h"
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86 struct dlm_message *ms);
87 static int receive_extralen(struct dlm_message *ms);
89 #define FAKE_USER_AST (void*)0xff00ff00
92 * Lock compatibilty matrix - thanks Steve
93 * UN = Unlocked state. Not really a state, used as a flag
94 * PD = Padding. Used to make the matrix a nice power of two in size
95 * Other states are the same as the VMS DLM.
96 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
99 static const int __dlm_compat_matrix[8][8] = {
100 /* UN NL CR CW PR PW EX PD */
101 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
102 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
103 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
104 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
105 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
106 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
107 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
108 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
112 * This defines the direction of transfer of LVB data.
113 * Granted mode is the row; requested mode is the column.
114 * Usage: matrix[grmode+1][rqmode+1]
115 * 1 = LVB is returned to the caller
116 * 0 = LVB is written to the resource
117 * -1 = nothing happens to the LVB
120 const int dlm_lvb_operations[8][8] = {
121 /* UN NL CR CW PR PW EX PD*/
122 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
123 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
124 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
125 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
126 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
127 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
128 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
129 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
132 #define modes_compat(gr, rq) \
133 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
135 int dlm_modes_compat(int mode1, int mode2)
137 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141 * Compatibility matrix for conversions with QUECVT set.
142 * Granted mode is the row; requested mode is the column.
143 * Usage: matrix[grmode+1][rqmode+1]
146 static const int __quecvt_compat_matrix[8][8] = {
147 /* UN NL CR CW PR PW EX PD */
148 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
149 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
150 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
151 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
152 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
153 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
154 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
155 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
158 void dlm_print_lkb(struct dlm_lkb *lkb)
160 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
161 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
162 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
163 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
164 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
167 void dlm_print_rsb(struct dlm_rsb *r)
169 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
170 r->res_nodeid, r->res_flags, r->res_first_lkid,
171 r->res_recover_locks_count, r->res_name);
174 /* Threads cannot use the lockspace while it's being recovered */
176 static inline void lock_recovery(struct dlm_ls *ls)
178 down_read(&ls->ls_in_recovery);
181 static inline void unlock_recovery(struct dlm_ls *ls)
183 up_read(&ls->ls_in_recovery);
186 static inline int lock_recovery_try(struct dlm_ls *ls)
188 return down_read_trylock(&ls->ls_in_recovery);
191 static inline int can_be_queued(struct dlm_lkb *lkb)
193 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
196 static inline int force_blocking_asts(struct dlm_lkb *lkb)
198 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
201 static inline int is_demoted(struct dlm_lkb *lkb)
203 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
206 static inline int is_remote(struct dlm_rsb *r)
208 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
209 return !!r->res_nodeid;
212 static inline int is_process_copy(struct dlm_lkb *lkb)
214 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
217 static inline int is_master_copy(struct dlm_lkb *lkb)
219 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
220 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
221 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
224 static inline int middle_conversion(struct dlm_lkb *lkb)
226 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
227 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
232 static inline int down_conversion(struct dlm_lkb *lkb)
234 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
237 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
239 if (is_master_copy(lkb))
242 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
244 lkb->lkb_lksb->sb_status = rv;
245 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
247 dlm_add_ast(lkb, AST_COMP);
250 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
252 if (is_master_copy(lkb))
253 send_bast(r, lkb, rqmode);
255 lkb->lkb_bastmode = rqmode;
256 dlm_add_ast(lkb, AST_BAST);
261 * Basic operations on rsb's and lkb's
264 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
268 r = allocate_rsb(ls, len);
274 memcpy(r->res_name, name, len);
275 mutex_init(&r->res_mutex);
277 INIT_LIST_HEAD(&r->res_lookup);
278 INIT_LIST_HEAD(&r->res_grantqueue);
279 INIT_LIST_HEAD(&r->res_convertqueue);
280 INIT_LIST_HEAD(&r->res_waitqueue);
281 INIT_LIST_HEAD(&r->res_root_list);
282 INIT_LIST_HEAD(&r->res_recover_list);
287 static int search_rsb_list(struct list_head *head, char *name, int len,
288 unsigned int flags, struct dlm_rsb **r_ret)
293 list_for_each_entry(r, head, res_hashchain) {
294 if (len == r->res_length && !memcmp(name, r->res_name, len))
300 if (r->res_nodeid && (flags & R_MASTER))
306 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
307 unsigned int flags, struct dlm_rsb **r_ret)
312 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
314 kref_get(&r->res_ref);
317 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
321 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
323 if (dlm_no_directory(ls))
326 if (r->res_nodeid == -1) {
327 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
328 r->res_first_lkid = 0;
329 } else if (r->res_nodeid > 0) {
330 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
331 r->res_first_lkid = 0;
333 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
334 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
341 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
342 unsigned int flags, struct dlm_rsb **r_ret)
345 write_lock(&ls->ls_rsbtbl[b].lock);
346 error = _search_rsb(ls, name, len, b, flags, r_ret);
347 write_unlock(&ls->ls_rsbtbl[b].lock);
352 * Find rsb in rsbtbl and potentially create/add one
354 * Delaying the release of rsb's has a similar benefit to applications keeping
355 * NL locks on an rsb, but without the guarantee that the cached master value
356 * will still be valid when the rsb is reused. Apps aren't always smart enough
357 * to keep NL locks on an rsb that they may lock again shortly; this can lead
358 * to excessive master lookups and removals if we don't delay the release.
360 * Searching for an rsb means looking through both the normal list and toss
361 * list. When found on the toss list the rsb is moved to the normal list with
362 * ref count of 1; when found on normal list the ref count is incremented.
365 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
366 unsigned int flags, struct dlm_rsb **r_ret)
368 struct dlm_rsb *r, *tmp;
369 uint32_t hash, bucket;
372 if (dlm_no_directory(ls))
375 hash = jhash(name, namelen, 0);
376 bucket = hash & (ls->ls_rsbtbl_size - 1);
378 error = search_rsb(ls, name, namelen, bucket, flags, &r);
382 if (error == -EBADR && !(flags & R_CREATE))
385 /* the rsb was found but wasn't a master copy */
386 if (error == -ENOTBLK)
390 r = create_rsb(ls, name, namelen);
395 r->res_bucket = bucket;
397 kref_init(&r->res_ref);
399 /* With no directory, the master can be set immediately */
400 if (dlm_no_directory(ls)) {
401 int nodeid = dlm_dir_nodeid(r);
402 if (nodeid == dlm_our_nodeid())
404 r->res_nodeid = nodeid;
407 write_lock(&ls->ls_rsbtbl[bucket].lock);
408 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
410 write_unlock(&ls->ls_rsbtbl[bucket].lock);
415 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
416 write_unlock(&ls->ls_rsbtbl[bucket].lock);
423 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
424 unsigned int flags, struct dlm_rsb **r_ret)
426 return find_rsb(ls, name, namelen, flags, r_ret);
429 /* This is only called to add a reference when the code already holds
430 a valid reference to the rsb, so there's no need for locking. */
432 static inline void hold_rsb(struct dlm_rsb *r)
434 kref_get(&r->res_ref);
437 void dlm_hold_rsb(struct dlm_rsb *r)
442 static void toss_rsb(struct kref *kref)
444 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
445 struct dlm_ls *ls = r->res_ls;
447 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
448 kref_init(&r->res_ref);
449 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
450 r->res_toss_time = jiffies;
452 free_lvb(r->res_lvbptr);
453 r->res_lvbptr = NULL;
457 /* When all references to the rsb are gone it's transfered to
458 the tossed list for later disposal. */
460 static void put_rsb(struct dlm_rsb *r)
462 struct dlm_ls *ls = r->res_ls;
463 uint32_t bucket = r->res_bucket;
465 write_lock(&ls->ls_rsbtbl[bucket].lock);
466 kref_put(&r->res_ref, toss_rsb);
467 write_unlock(&ls->ls_rsbtbl[bucket].lock);
470 void dlm_put_rsb(struct dlm_rsb *r)
475 /* See comment for unhold_lkb */
477 static void unhold_rsb(struct dlm_rsb *r)
480 rv = kref_put(&r->res_ref, toss_rsb);
481 DLM_ASSERT(!rv, dlm_print_rsb(r););
484 static void kill_rsb(struct kref *kref)
486 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
488 /* All work is done after the return from kref_put() so we
489 can release the write_lock before the remove and free. */
491 DLM_ASSERT(list_empty(&r->res_lookup),);
492 DLM_ASSERT(list_empty(&r->res_grantqueue),);
493 DLM_ASSERT(list_empty(&r->res_convertqueue),);
494 DLM_ASSERT(list_empty(&r->res_waitqueue),);
495 DLM_ASSERT(list_empty(&r->res_root_list),);
496 DLM_ASSERT(list_empty(&r->res_recover_list),);
499 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
500 The rsb must exist as long as any lkb's for it do. */
502 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
505 lkb->lkb_resource = r;
508 static void detach_lkb(struct dlm_lkb *lkb)
510 if (lkb->lkb_resource) {
511 put_rsb(lkb->lkb_resource);
512 lkb->lkb_resource = NULL;
516 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
518 struct dlm_lkb *lkb, *tmp;
522 lkb = allocate_lkb(ls);
526 lkb->lkb_nodeid = -1;
527 lkb->lkb_grmode = DLM_LOCK_IV;
528 kref_init(&lkb->lkb_ref);
530 get_random_bytes(&bucket, sizeof(bucket));
531 bucket &= (ls->ls_lkbtbl_size - 1);
533 write_lock(&ls->ls_lkbtbl[bucket].lock);
535 /* counter can roll over so we must verify lkid is not in use */
538 lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16);
540 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
542 if (tmp->lkb_id != lkid)
550 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
551 write_unlock(&ls->ls_lkbtbl[bucket].lock);
557 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
559 uint16_t bucket = lkid & 0xFFFF;
562 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
563 if (lkb->lkb_id == lkid)
569 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
572 uint16_t bucket = lkid & 0xFFFF;
574 if (bucket >= ls->ls_lkbtbl_size)
577 read_lock(&ls->ls_lkbtbl[bucket].lock);
578 lkb = __find_lkb(ls, lkid);
580 kref_get(&lkb->lkb_ref);
581 read_unlock(&ls->ls_lkbtbl[bucket].lock);
584 return lkb ? 0 : -ENOENT;
587 static void kill_lkb(struct kref *kref)
589 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
591 /* All work is done after the return from kref_put() so we
592 can release the write_lock before the detach_lkb */
594 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
597 /* __put_lkb() is used when an lkb may not have an rsb attached to
598 it so we need to provide the lockspace explicitly */
600 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
602 uint16_t bucket = lkb->lkb_id & 0xFFFF;
604 write_lock(&ls->ls_lkbtbl[bucket].lock);
605 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
606 list_del(&lkb->lkb_idtbl_list);
607 write_unlock(&ls->ls_lkbtbl[bucket].lock);
611 /* for local/process lkbs, lvbptr points to caller's lksb */
612 if (lkb->lkb_lvbptr && is_master_copy(lkb))
613 free_lvb(lkb->lkb_lvbptr);
617 write_unlock(&ls->ls_lkbtbl[bucket].lock);
622 int dlm_put_lkb(struct dlm_lkb *lkb)
626 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
627 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
629 ls = lkb->lkb_resource->res_ls;
630 return __put_lkb(ls, lkb);
633 /* This is only called to add a reference when the code already holds
634 a valid reference to the lkb, so there's no need for locking. */
636 static inline void hold_lkb(struct dlm_lkb *lkb)
638 kref_get(&lkb->lkb_ref);
641 /* This is called when we need to remove a reference and are certain
642 it's not the last ref. e.g. del_lkb is always called between a
643 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
644 put_lkb would work fine, but would involve unnecessary locking */
646 static inline void unhold_lkb(struct dlm_lkb *lkb)
649 rv = kref_put(&lkb->lkb_ref, kill_lkb);
650 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
653 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
656 struct dlm_lkb *lkb = NULL;
658 list_for_each_entry(lkb, head, lkb_statequeue)
659 if (lkb->lkb_rqmode < mode)
663 list_add_tail(new, head);
665 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
668 /* add/remove lkb to rsb's grant/convert/wait queue */
670 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
672 kref_get(&lkb->lkb_ref);
674 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
676 lkb->lkb_status = status;
679 case DLM_LKSTS_WAITING:
680 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
681 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
683 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
685 case DLM_LKSTS_GRANTED:
686 /* convention says granted locks kept in order of grmode */
687 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
690 case DLM_LKSTS_CONVERT:
691 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
692 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
694 list_add_tail(&lkb->lkb_statequeue,
695 &r->res_convertqueue);
698 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
702 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
705 list_del(&lkb->lkb_statequeue);
709 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
713 add_lkb(r, lkb, sts);
717 /* add/remove lkb from global waiters list of lkb's waiting for
718 a reply from a remote node */
720 static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
722 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
724 mutex_lock(&ls->ls_waiters_mutex);
725 if (lkb->lkb_wait_type) {
726 log_print("add_to_waiters error %d", lkb->lkb_wait_type);
729 lkb->lkb_wait_type = mstype;
730 kref_get(&lkb->lkb_ref);
731 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
733 mutex_unlock(&ls->ls_waiters_mutex);
736 static int _remove_from_waiters(struct dlm_lkb *lkb)
740 if (!lkb->lkb_wait_type) {
741 log_print("remove_from_waiters error");
745 lkb->lkb_wait_type = 0;
746 list_del(&lkb->lkb_wait_reply);
752 static int remove_from_waiters(struct dlm_lkb *lkb)
754 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
757 mutex_lock(&ls->ls_waiters_mutex);
758 error = _remove_from_waiters(lkb);
759 mutex_unlock(&ls->ls_waiters_mutex);
763 static void dir_remove(struct dlm_rsb *r)
767 if (dlm_no_directory(r->res_ls))
770 to_nodeid = dlm_dir_nodeid(r);
771 if (to_nodeid != dlm_our_nodeid())
774 dlm_dir_remove_entry(r->res_ls, to_nodeid,
775 r->res_name, r->res_length);
778 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
779 found since they are in order of newest to oldest? */
781 static int shrink_bucket(struct dlm_ls *ls, int b)
784 int count = 0, found;
788 write_lock(&ls->ls_rsbtbl[b].lock);
789 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
791 if (!time_after_eq(jiffies, r->res_toss_time +
792 dlm_config.toss_secs * HZ))
799 write_unlock(&ls->ls_rsbtbl[b].lock);
803 if (kref_put(&r->res_ref, kill_rsb)) {
804 list_del(&r->res_hashchain);
805 write_unlock(&ls->ls_rsbtbl[b].lock);
812 write_unlock(&ls->ls_rsbtbl[b].lock);
813 log_error(ls, "tossed rsb in use %s", r->res_name);
820 void dlm_scan_rsbs(struct dlm_ls *ls)
824 if (dlm_locking_stopped(ls))
827 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
828 shrink_bucket(ls, i);
833 /* lkb is master or local copy */
835 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
837 int b, len = r->res_ls->ls_lvblen;
839 /* b=1 lvb returned to caller
840 b=0 lvb written to rsb or invalidated
843 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
846 if (!lkb->lkb_lvbptr)
849 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
855 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
856 lkb->lkb_lvbseq = r->res_lvbseq;
859 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
860 rsb_set_flag(r, RSB_VALNOTVALID);
864 if (!lkb->lkb_lvbptr)
867 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
871 r->res_lvbptr = allocate_lvb(r->res_ls);
876 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
878 lkb->lkb_lvbseq = r->res_lvbseq;
879 rsb_clear_flag(r, RSB_VALNOTVALID);
882 if (rsb_flag(r, RSB_VALNOTVALID))
883 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
886 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
888 if (lkb->lkb_grmode < DLM_LOCK_PW)
891 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
892 rsb_set_flag(r, RSB_VALNOTVALID);
896 if (!lkb->lkb_lvbptr)
899 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
903 r->res_lvbptr = allocate_lvb(r->res_ls);
908 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
910 rsb_clear_flag(r, RSB_VALNOTVALID);
913 /* lkb is process copy (pc) */
915 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
916 struct dlm_message *ms)
920 if (!lkb->lkb_lvbptr)
923 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
926 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
928 int len = receive_extralen(ms);
929 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
930 lkb->lkb_lvbseq = ms->m_lvbseq;
934 /* Manipulate lkb's on rsb's convert/granted/waiting queues
935 remove_lock -- used for unlock, removes lkb from granted
936 revert_lock -- used for cancel, moves lkb from convert to granted
937 grant_lock -- used for request and convert, adds lkb to granted or
938 moves lkb from convert or waiting to granted
940 Each of these is used for master or local copy lkb's. There is
941 also a _pc() variation used to make the corresponding change on
942 a process copy (pc) lkb. */
944 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
947 lkb->lkb_grmode = DLM_LOCK_IV;
948 /* this unhold undoes the original ref from create_lkb()
949 so this leads to the lkb being freed */
953 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
955 set_lvb_unlock(r, lkb);
956 _remove_lock(r, lkb);
959 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
961 _remove_lock(r, lkb);
964 static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
966 lkb->lkb_rqmode = DLM_LOCK_IV;
968 switch (lkb->lkb_status) {
969 case DLM_LKSTS_GRANTED:
971 case DLM_LKSTS_CONVERT:
972 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
974 case DLM_LKSTS_WAITING:
976 lkb->lkb_grmode = DLM_LOCK_IV;
977 /* this unhold undoes the original ref from create_lkb()
978 so this leads to the lkb being freed */
982 log_print("invalid status for revert %d", lkb->lkb_status);
986 static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
991 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
993 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
994 lkb->lkb_grmode = lkb->lkb_rqmode;
996 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
998 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1001 lkb->lkb_rqmode = DLM_LOCK_IV;
1004 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1006 set_lvb_lock(r, lkb);
1007 _grant_lock(r, lkb);
1008 lkb->lkb_highbast = 0;
1011 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1012 struct dlm_message *ms)
1014 set_lvb_lock_pc(r, lkb, ms);
1015 _grant_lock(r, lkb);
1018 /* called by grant_pending_locks() which means an async grant message must
1019 be sent to the requesting node in addition to granting the lock if the
1020 lkb belongs to a remote node. */
1022 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1025 if (is_master_copy(lkb))
1028 queue_cast(r, lkb, 0);
1031 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1033 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1035 if (lkb->lkb_id == first->lkb_id)
1041 /* Check if the given lkb conflicts with another lkb on the queue. */
1043 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1045 struct dlm_lkb *this;
1047 list_for_each_entry(this, head, lkb_statequeue) {
1050 if (!modes_compat(this, lkb))
1057 * "A conversion deadlock arises with a pair of lock requests in the converting
1058 * queue for one resource. The granted mode of each lock blocks the requested
1059 * mode of the other lock."
1061 * Part 2: if the granted mode of lkb is preventing the first lkb in the
1062 * convert queue from being granted, then demote lkb (set grmode to NL).
1063 * This second form requires that we check for conv-deadlk even when
1064 * now == 0 in _can_be_granted().
1067 * Granted Queue: empty
1068 * Convert Queue: NL->EX (first lock)
1069 * PR->EX (second lock)
1071 * The first lock can't be granted because of the granted mode of the second
1072 * lock and the second lock can't be granted because it's not first in the
1073 * list. We demote the granted mode of the second lock (the lkb passed to this
1076 * After the resolution, the "grant pending" function needs to go back and try
1077 * to grant locks on the convert queue again since the first lock can now be
1081 static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1083 struct dlm_lkb *this, *first = NULL, *self = NULL;
1085 list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1093 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
1097 /* if lkb is on the convert queue and is preventing the first
1098 from being granted, then there's deadlock and we demote lkb.
1099 multiple converting locks may need to do this before the first
1100 converting lock can be granted. */
1102 if (self && self != first) {
1103 if (!modes_compat(lkb, first) &&
1104 !queue_conflict(&rsb->res_grantqueue, first))
1112 * Return 1 if the lock can be granted, 0 otherwise.
1113 * Also detect and resolve conversion deadlocks.
1115 * lkb is the lock to be granted
1117 * now is 1 if the function is being called in the context of the
1118 * immediate request, it is 0 if called later, after the lock has been
1121 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1124 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1126 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1129 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1130 * a new request for a NL mode lock being blocked.
1132 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1133 * request, then it would be granted. In essence, the use of this flag
1134 * tells the Lock Manager to expedite theis request by not considering
1135 * what may be in the CONVERTING or WAITING queues... As of this
1136 * writing, the EXPEDITE flag can be used only with new requests for NL
1137 * mode locks. This flag is not valid for conversion requests.
1139 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1140 * conversion or used with a non-NL requested mode. We also know an
1141 * EXPEDITE request is always granted immediately, so now must always
1142 * be 1. The full condition to grant an expedite request: (now &&
1143 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1144 * therefore be shortened to just checking the flag.
1147 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1151 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1152 * added to the remaining conditions.
1155 if (queue_conflict(&r->res_grantqueue, lkb))
1159 * 6-3: By default, a conversion request is immediately granted if the
1160 * requested mode is compatible with the modes of all other granted
1164 if (queue_conflict(&r->res_convertqueue, lkb))
1168 * 6-5: But the default algorithm for deciding whether to grant or
1169 * queue conversion requests does not by itself guarantee that such
1170 * requests are serviced on a "first come first serve" basis. This, in
1171 * turn, can lead to a phenomenon known as "indefinate postponement".
1173 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1174 * the system service employed to request a lock conversion. This flag
1175 * forces certain conversion requests to be queued, even if they are
1176 * compatible with the granted modes of other locks on the same
1177 * resource. Thus, the use of this flag results in conversion requests
1178 * being ordered on a "first come first servce" basis.
1180 * DCT: This condition is all about new conversions being able to occur
1181 * "in place" while the lock remains on the granted queue (assuming
1182 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1183 * doesn't _have_ to go onto the convert queue where it's processed in
1184 * order. The "now" variable is necessary to distinguish converts
1185 * being received and processed for the first time now, because once a
1186 * convert is moved to the conversion queue the condition below applies
1187 * requiring fifo granting.
1190 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1194 * The NOORDER flag is set to avoid the standard vms rules on grant
1198 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1202 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1203 * granted until all other conversion requests ahead of it are granted
1207 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1211 * 6-4: By default, a new request is immediately granted only if all
1212 * three of the following conditions are satisfied when the request is
1214 * - The queue of ungranted conversion requests for the resource is
1216 * - The queue of ungranted new requests for the resource is empty.
1217 * - The mode of the new request is compatible with the most
1218 * restrictive mode of all granted locks on the resource.
1221 if (now && !conv && list_empty(&r->res_convertqueue) &&
1222 list_empty(&r->res_waitqueue))
1226 * 6-4: Once a lock request is in the queue of ungranted new requests,
1227 * it cannot be granted until the queue of ungranted conversion
1228 * requests is empty, all ungranted new requests ahead of it are
1229 * granted and/or canceled, and it is compatible with the granted mode
1230 * of the most restrictive lock granted on the resource.
1233 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1234 first_in_list(lkb, &r->res_waitqueue))
1239 * The following, enabled by CONVDEADLK, departs from VMS.
1242 if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1243 conversion_deadlock_detect(r, lkb)) {
1244 lkb->lkb_grmode = DLM_LOCK_NL;
1245 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1252 * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1253 * simple way to provide a big optimization to applications that can use them.
1256 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1258 uint32_t flags = lkb->lkb_exflags;
1260 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1262 rv = _can_be_granted(r, lkb, now);
1266 if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1269 if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1271 else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1275 lkb->lkb_rqmode = alt;
1276 rv = _can_be_granted(r, lkb, now);
1278 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1280 lkb->lkb_rqmode = rqmode;
1286 static int grant_pending_convert(struct dlm_rsb *r, int high)
1288 struct dlm_lkb *lkb, *s;
1289 int hi, demoted, quit, grant_restart, demote_restart;
1297 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1298 demoted = is_demoted(lkb);
1299 if (can_be_granted(r, lkb, 0)) {
1300 grant_lock_pending(r, lkb);
1303 hi = max_t(int, lkb->lkb_rqmode, hi);
1304 if (!demoted && is_demoted(lkb))
1311 if (demote_restart && !quit) {
1316 return max_t(int, high, hi);
1319 static int grant_pending_wait(struct dlm_rsb *r, int high)
1321 struct dlm_lkb *lkb, *s;
1323 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1324 if (can_be_granted(r, lkb, 0))
1325 grant_lock_pending(r, lkb);
1327 high = max_t(int, lkb->lkb_rqmode, high);
1333 static void grant_pending_locks(struct dlm_rsb *r)
1335 struct dlm_lkb *lkb, *s;
1336 int high = DLM_LOCK_IV;
1338 DLM_ASSERT(is_master(r), dlm_print_rsb(r););
1340 high = grant_pending_convert(r, high);
1341 high = grant_pending_wait(r, high);
1343 if (high == DLM_LOCK_IV)
1347 * If there are locks left on the wait/convert queue then send blocking
1348 * ASTs to granted locks based on the largest requested mode (high)
1349 * found above. FIXME: highbast < high comparison not valid for PR/CW.
1352 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1353 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1354 !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1355 queue_bast(r, lkb, high);
1356 lkb->lkb_highbast = high;
1361 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1362 struct dlm_lkb *lkb)
1366 list_for_each_entry(gr, head, lkb_statequeue) {
1367 if (gr->lkb_bastaddr &&
1368 gr->lkb_highbast < lkb->lkb_rqmode &&
1369 !modes_compat(gr, lkb)) {
1370 queue_bast(r, gr, lkb->lkb_rqmode);
1371 gr->lkb_highbast = lkb->lkb_rqmode;
1376 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1378 send_bast_queue(r, &r->res_grantqueue, lkb);
1381 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1383 send_bast_queue(r, &r->res_grantqueue, lkb);
1384 send_bast_queue(r, &r->res_convertqueue, lkb);
1387 /* set_master(r, lkb) -- set the master nodeid of a resource
1389 The purpose of this function is to set the nodeid field in the given
1390 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1391 known, it can just be copied to the lkb and the function will return
1392 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1393 before it can be copied to the lkb.
1395 When the rsb nodeid is being looked up remotely, the initial lkb
1396 causing the lookup is kept on the ls_waiters list waiting for the
1397 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1398 on the rsb's res_lookup list until the master is verified.
1401 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1402 1: the rsb master is not available and the lkb has been placed on
1406 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1408 struct dlm_ls *ls = r->res_ls;
1409 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1411 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1412 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1413 r->res_first_lkid = lkb->lkb_id;
1414 lkb->lkb_nodeid = r->res_nodeid;
1418 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1419 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1423 if (r->res_nodeid == 0) {
1424 lkb->lkb_nodeid = 0;
1428 if (r->res_nodeid > 0) {
1429 lkb->lkb_nodeid = r->res_nodeid;
1433 DLM_ASSERT(r->res_nodeid == -1, dlm_print_rsb(r););
1435 dir_nodeid = dlm_dir_nodeid(r);
1437 if (dir_nodeid != our_nodeid) {
1438 r->res_first_lkid = lkb->lkb_id;
1439 send_lookup(r, lkb);
1444 /* It's possible for dlm_scand to remove an old rsb for
1445 this same resource from the toss list, us to create
1446 a new one, look up the master locally, and find it
1447 already exists just before dlm_scand does the
1448 dir_remove() on the previous rsb. */
1450 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1451 r->res_length, &ret_nodeid);
1454 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1458 if (ret_nodeid == our_nodeid) {
1459 r->res_first_lkid = 0;
1461 lkb->lkb_nodeid = 0;
1463 r->res_first_lkid = lkb->lkb_id;
1464 r->res_nodeid = ret_nodeid;
1465 lkb->lkb_nodeid = ret_nodeid;
1470 static void process_lookup_list(struct dlm_rsb *r)
1472 struct dlm_lkb *lkb, *safe;
1474 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1475 list_del(&lkb->lkb_rsb_lookup);
1476 _request_lock(r, lkb);
1481 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1483 static void confirm_master(struct dlm_rsb *r, int error)
1485 struct dlm_lkb *lkb;
1487 if (!r->res_first_lkid)
1493 r->res_first_lkid = 0;
1494 process_lookup_list(r);
1498 /* the remote master didn't queue our NOQUEUE request;
1499 make a waiting lkb the first_lkid */
1501 r->res_first_lkid = 0;
1503 if (!list_empty(&r->res_lookup)) {
1504 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1506 list_del(&lkb->lkb_rsb_lookup);
1507 r->res_first_lkid = lkb->lkb_id;
1508 _request_lock(r, lkb);
1514 log_error(r->res_ls, "confirm_master unknown error %d", error);
1518 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1519 int namelen, uint32_t parent_lkid, void *ast,
1520 void *astarg, void *bast, struct dlm_args *args)
1524 /* check for invalid arg usage */
1526 if (mode < 0 || mode > DLM_LOCK_EX)
1529 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1532 if (flags & DLM_LKF_CANCEL)
1535 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1538 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1541 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1544 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1547 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1550 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1553 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1559 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1562 /* parent/child locks not yet supported */
1566 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1569 /* these args will be copied to the lkb in validate_lock_args,
1570 it cannot be done now because when converting locks, fields in
1571 an active lkb cannot be modified before locking the rsb */
1573 args->flags = flags;
1574 args->astaddr = ast;
1575 args->astparam = (long) astarg;
1576 args->bastaddr = bast;
1584 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1586 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1587 DLM_LKF_FORCEUNLOCK))
1590 args->flags = flags;
1591 args->astparam = (long) astarg;
1595 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1596 struct dlm_args *args)
1600 if (args->flags & DLM_LKF_CONVERT) {
1601 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1604 if (args->flags & DLM_LKF_QUECVT &&
1605 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1609 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1612 if (lkb->lkb_wait_type)
1616 lkb->lkb_exflags = args->flags;
1617 lkb->lkb_sbflags = 0;
1618 lkb->lkb_astaddr = args->astaddr;
1619 lkb->lkb_astparam = args->astparam;
1620 lkb->lkb_bastaddr = args->bastaddr;
1621 lkb->lkb_rqmode = args->mode;
1622 lkb->lkb_lksb = args->lksb;
1623 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1624 lkb->lkb_ownpid = (int) current->pid;
1630 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1634 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1637 if (args->flags & DLM_LKF_FORCEUNLOCK)
1640 if (args->flags & DLM_LKF_CANCEL &&
1641 lkb->lkb_status == DLM_LKSTS_GRANTED)
1644 if (!(args->flags & DLM_LKF_CANCEL) &&
1645 lkb->lkb_status != DLM_LKSTS_GRANTED)
1649 if (lkb->lkb_wait_type)
1653 lkb->lkb_exflags = args->flags;
1654 lkb->lkb_sbflags = 0;
1655 lkb->lkb_astparam = args->astparam;
1663 * Four stage 4 varieties:
1664 * do_request(), do_convert(), do_unlock(), do_cancel()
1665 * These are called on the master node for the given lock and
1666 * from the central locking logic.
1669 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1673 if (can_be_granted(r, lkb, 1)) {
1675 queue_cast(r, lkb, 0);
1679 if (can_be_queued(lkb)) {
1680 error = -EINPROGRESS;
1681 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1682 send_blocking_asts(r, lkb);
1687 if (force_blocking_asts(lkb))
1688 send_blocking_asts_all(r, lkb);
1689 queue_cast(r, lkb, -EAGAIN);
1695 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1699 /* changing an existing lock may allow others to be granted */
1701 if (can_be_granted(r, lkb, 1)) {
1703 queue_cast(r, lkb, 0);
1704 grant_pending_locks(r);
1708 if (can_be_queued(lkb)) {
1709 if (is_demoted(lkb))
1710 grant_pending_locks(r);
1711 error = -EINPROGRESS;
1713 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
1714 send_blocking_asts(r, lkb);
1719 if (force_blocking_asts(lkb))
1720 send_blocking_asts_all(r, lkb);
1721 queue_cast(r, lkb, -EAGAIN);
1727 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1729 remove_lock(r, lkb);
1730 queue_cast(r, lkb, -DLM_EUNLOCK);
1731 grant_pending_locks(r);
1732 return -DLM_EUNLOCK;
1735 /* FIXME: if revert_lock() finds that the lkb is granted, we should
1736 skip the queue_cast(ECANCEL). It indicates that the request/convert
1737 completed (and queued a normal ast) just before the cancel; we don't
1738 want to clobber the sb_result for the normal ast with ECANCEL. */
1740 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1742 revert_lock(r, lkb);
1743 queue_cast(r, lkb, -DLM_ECANCEL);
1744 grant_pending_locks(r);
1745 return -DLM_ECANCEL;
1749 * Four stage 3 varieties:
1750 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
1753 /* add a new lkb to a possibly new rsb, called by requesting process */
1755 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1759 /* set_master: sets lkb nodeid from r */
1761 error = set_master(r, lkb);
1770 /* receive_request() calls do_request() on remote node */
1771 error = send_request(r, lkb);
1773 error = do_request(r, lkb);
1778 /* change some property of an existing lkb, e.g. mode */
1780 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1785 /* receive_convert() calls do_convert() on remote node */
1786 error = send_convert(r, lkb);
1788 error = do_convert(r, lkb);
1793 /* remove an existing lkb from the granted queue */
1795 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1800 /* receive_unlock() calls do_unlock() on remote node */
1801 error = send_unlock(r, lkb);
1803 error = do_unlock(r, lkb);
1808 /* remove an existing lkb from the convert or wait queue */
1810 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1815 /* receive_cancel() calls do_cancel() on remote node */
1816 error = send_cancel(r, lkb);
1818 error = do_cancel(r, lkb);
1824 * Four stage 2 varieties:
1825 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
1828 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
1829 int len, struct dlm_args *args)
1834 error = validate_lock_args(ls, lkb, args);
1838 error = find_rsb(ls, name, len, R_CREATE, &r);
1845 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
1847 error = _request_lock(r, lkb);
1856 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1857 struct dlm_args *args)
1862 r = lkb->lkb_resource;
1867 error = validate_lock_args(ls, lkb, args);
1871 error = _convert_lock(r, lkb);
1878 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1879 struct dlm_args *args)
1884 r = lkb->lkb_resource;
1889 error = validate_unlock_args(lkb, args);
1893 error = _unlock_lock(r, lkb);
1900 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1901 struct dlm_args *args)
1906 r = lkb->lkb_resource;
1911 error = validate_unlock_args(lkb, args);
1915 error = _cancel_lock(r, lkb);
1923 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
1926 int dlm_lock(dlm_lockspace_t *lockspace,
1928 struct dlm_lksb *lksb,
1931 unsigned int namelen,
1932 uint32_t parent_lkid,
1933 void (*ast) (void *astarg),
1935 void (*bast) (void *astarg, int mode))
1938 struct dlm_lkb *lkb;
1939 struct dlm_args args;
1940 int error, convert = flags & DLM_LKF_CONVERT;
1942 ls = dlm_find_lockspace_local(lockspace);
1949 error = find_lkb(ls, lksb->sb_lkid, &lkb);
1951 error = create_lkb(ls, &lkb);
1956 error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
1957 astarg, bast, &args);
1962 error = convert_lock(ls, lkb, &args);
1964 error = request_lock(ls, lkb, name, namelen, &args);
1966 if (error == -EINPROGRESS)
1969 if (convert || error)
1971 if (error == -EAGAIN)
1974 unlock_recovery(ls);
1975 dlm_put_lockspace(ls);
1979 int dlm_unlock(dlm_lockspace_t *lockspace,
1982 struct dlm_lksb *lksb,
1986 struct dlm_lkb *lkb;
1987 struct dlm_args args;
1990 ls = dlm_find_lockspace_local(lockspace);
1996 error = find_lkb(ls, lkid, &lkb);
2000 error = set_unlock_args(flags, astarg, &args);
2004 if (flags & DLM_LKF_CANCEL)
2005 error = cancel_lock(ls, lkb, &args);
2007 error = unlock_lock(ls, lkb, &args);
2009 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2014 unlock_recovery(ls);
2015 dlm_put_lockspace(ls);
2020 * send/receive routines for remote operations and replies
2024 * send_request receive_request
2025 * send_convert receive_convert
2026 * send_unlock receive_unlock
2027 * send_cancel receive_cancel
2028 * send_grant receive_grant
2029 * send_bast receive_bast
2030 * send_lookup receive_lookup
2031 * send_remove receive_remove
2034 * receive_request_reply send_request_reply
2035 * receive_convert_reply send_convert_reply
2036 * receive_unlock_reply send_unlock_reply
2037 * receive_cancel_reply send_cancel_reply
2038 * receive_lookup_reply send_lookup_reply
2041 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2042 int to_nodeid, int mstype,
2043 struct dlm_message **ms_ret,
2044 struct dlm_mhandle **mh_ret)
2046 struct dlm_message *ms;
2047 struct dlm_mhandle *mh;
2049 int mb_len = sizeof(struct dlm_message);
2052 case DLM_MSG_REQUEST:
2053 case DLM_MSG_LOOKUP:
2054 case DLM_MSG_REMOVE:
2055 mb_len += r->res_length;
2057 case DLM_MSG_CONVERT:
2058 case DLM_MSG_UNLOCK:
2059 case DLM_MSG_REQUEST_REPLY:
2060 case DLM_MSG_CONVERT_REPLY:
2062 if (lkb && lkb->lkb_lvbptr)
2063 mb_len += r->res_ls->ls_lvblen;
2067 /* get_buffer gives us a message handle (mh) that we need to
2068 pass into lowcomms_commit and a message buffer (mb) that we
2069 write our data into */
2071 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2075 memset(mb, 0, mb_len);
2077 ms = (struct dlm_message *) mb;
2079 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2080 ms->m_header.h_lockspace = r->res_ls->ls_global_id;
2081 ms->m_header.h_nodeid = dlm_our_nodeid();
2082 ms->m_header.h_length = mb_len;
2083 ms->m_header.h_cmd = DLM_MSG;
2085 ms->m_type = mstype;
2092 /* further lowcomms enhancements or alternate implementations may make
2093 the return value from this function useful at some point */
2095 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2097 dlm_message_out(ms);
2098 dlm_lowcomms_commit_buffer(mh);
2102 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2103 struct dlm_message *ms)
2105 ms->m_nodeid = lkb->lkb_nodeid;
2106 ms->m_pid = lkb->lkb_ownpid;
2107 ms->m_lkid = lkb->lkb_id;
2108 ms->m_remid = lkb->lkb_remid;
2109 ms->m_exflags = lkb->lkb_exflags;
2110 ms->m_sbflags = lkb->lkb_sbflags;
2111 ms->m_flags = lkb->lkb_flags;
2112 ms->m_lvbseq = lkb->lkb_lvbseq;
2113 ms->m_status = lkb->lkb_status;
2114 ms->m_grmode = lkb->lkb_grmode;
2115 ms->m_rqmode = lkb->lkb_rqmode;
2116 ms->m_hash = r->res_hash;
2118 /* m_result and m_bastmode are set from function args,
2119 not from lkb fields */
2121 if (lkb->lkb_bastaddr)
2122 ms->m_asts |= AST_BAST;
2123 if (lkb->lkb_astaddr)
2124 ms->m_asts |= AST_COMP;
2126 if (ms->m_type == DLM_MSG_REQUEST || ms->m_type == DLM_MSG_LOOKUP)
2127 memcpy(ms->m_extra, r->res_name, r->res_length);
2129 else if (lkb->lkb_lvbptr)
2130 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2134 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2136 struct dlm_message *ms;
2137 struct dlm_mhandle *mh;
2138 int to_nodeid, error;
2140 add_to_waiters(lkb, mstype);
2142 to_nodeid = r->res_nodeid;
2144 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2148 send_args(r, lkb, ms);
2150 error = send_message(mh, ms);
2156 remove_from_waiters(lkb);
2160 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2162 return send_common(r, lkb, DLM_MSG_REQUEST);
2165 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2169 error = send_common(r, lkb, DLM_MSG_CONVERT);
2171 /* down conversions go without a reply from the master */
2172 if (!error && down_conversion(lkb)) {
2173 remove_from_waiters(lkb);
2174 r->res_ls->ls_stub_ms.m_result = 0;
2175 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2181 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2182 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2183 that the master is still correct. */
2185 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2187 return send_common(r, lkb, DLM_MSG_UNLOCK);
2190 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2192 return send_common(r, lkb, DLM_MSG_CANCEL);
2195 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2197 struct dlm_message *ms;
2198 struct dlm_mhandle *mh;
2199 int to_nodeid, error;
2201 to_nodeid = lkb->lkb_nodeid;
2203 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2207 send_args(r, lkb, ms);
2211 error = send_message(mh, ms);
2216 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2218 struct dlm_message *ms;
2219 struct dlm_mhandle *mh;
2220 int to_nodeid, error;
2222 to_nodeid = lkb->lkb_nodeid;
2224 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2228 send_args(r, lkb, ms);
2230 ms->m_bastmode = mode;
2232 error = send_message(mh, ms);
2237 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2239 struct dlm_message *ms;
2240 struct dlm_mhandle *mh;
2241 int to_nodeid, error;
2243 add_to_waiters(lkb, DLM_MSG_LOOKUP);
2245 to_nodeid = dlm_dir_nodeid(r);
2247 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2251 send_args(r, lkb, ms);
2253 error = send_message(mh, ms);
2259 remove_from_waiters(lkb);
2263 static int send_remove(struct dlm_rsb *r)
2265 struct dlm_message *ms;
2266 struct dlm_mhandle *mh;
2267 int to_nodeid, error;
2269 to_nodeid = dlm_dir_nodeid(r);
2271 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2275 memcpy(ms->m_extra, r->res_name, r->res_length);
2276 ms->m_hash = r->res_hash;
2278 error = send_message(mh, ms);
2283 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2286 struct dlm_message *ms;
2287 struct dlm_mhandle *mh;
2288 int to_nodeid, error;
2290 to_nodeid = lkb->lkb_nodeid;
2292 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2296 send_args(r, lkb, ms);
2300 error = send_message(mh, ms);
2305 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2307 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2310 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2312 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2315 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2317 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2320 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2322 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2325 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2326 int ret_nodeid, int rv)
2328 struct dlm_rsb *r = &ls->ls_stub_rsb;
2329 struct dlm_message *ms;
2330 struct dlm_mhandle *mh;
2331 int error, nodeid = ms_in->m_header.h_nodeid;
2333 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2337 ms->m_lkid = ms_in->m_lkid;
2339 ms->m_nodeid = ret_nodeid;
2341 error = send_message(mh, ms);
2346 /* which args we save from a received message depends heavily on the type
2347 of message, unlike the send side where we can safely send everything about
2348 the lkb for any type of message */
2350 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2352 lkb->lkb_exflags = ms->m_exflags;
2353 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2354 (ms->m_flags & 0x0000FFFF);
2357 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2359 lkb->lkb_sbflags = ms->m_sbflags;
2360 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2361 (ms->m_flags & 0x0000FFFF);
2364 static int receive_extralen(struct dlm_message *ms)
2366 return (ms->m_header.h_length - sizeof(struct dlm_message));
2369 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2370 struct dlm_message *ms)
2374 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2375 if (!lkb->lkb_lvbptr)
2376 lkb->lkb_lvbptr = allocate_lvb(ls);
2377 if (!lkb->lkb_lvbptr)
2379 len = receive_extralen(ms);
2380 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2385 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2386 struct dlm_message *ms)
2388 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2389 lkb->lkb_ownpid = ms->m_pid;
2390 lkb->lkb_remid = ms->m_lkid;
2391 lkb->lkb_grmode = DLM_LOCK_IV;
2392 lkb->lkb_rqmode = ms->m_rqmode;
2393 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2394 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2396 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2398 if (receive_lvb(ls, lkb, ms))
2404 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2405 struct dlm_message *ms)
2407 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2408 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2409 lkb->lkb_nodeid, ms->m_header.h_nodeid,
2410 lkb->lkb_id, lkb->lkb_remid);
2414 if (!is_master_copy(lkb))
2417 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2420 if (receive_lvb(ls, lkb, ms))
2423 lkb->lkb_rqmode = ms->m_rqmode;
2424 lkb->lkb_lvbseq = ms->m_lvbseq;
2429 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2430 struct dlm_message *ms)
2432 if (!is_master_copy(lkb))
2434 if (receive_lvb(ls, lkb, ms))
2439 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2440 uses to send a reply and that the remote end uses to process the reply. */
2442 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2444 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2445 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2446 lkb->lkb_remid = ms->m_lkid;
2449 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2451 struct dlm_lkb *lkb;
2455 error = create_lkb(ls, &lkb);
2459 receive_flags(lkb, ms);
2460 lkb->lkb_flags |= DLM_IFL_MSTCPY;
2461 error = receive_request_args(ls, lkb, ms);
2467 namelen = receive_extralen(ms);
2469 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2478 error = do_request(r, lkb);
2479 send_request_reply(r, lkb, error);
2484 if (error == -EINPROGRESS)
2491 setup_stub_lkb(ls, ms);
2492 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2495 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2497 struct dlm_lkb *lkb;
2499 int error, reply = 1;
2501 error = find_lkb(ls, ms->m_remid, &lkb);
2505 r = lkb->lkb_resource;
2510 receive_flags(lkb, ms);
2511 error = receive_convert_args(ls, lkb, ms);
2514 reply = !down_conversion(lkb);
2516 error = do_convert(r, lkb);
2519 send_convert_reply(r, lkb, error);
2527 setup_stub_lkb(ls, ms);
2528 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2531 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2533 struct dlm_lkb *lkb;
2537 error = find_lkb(ls, ms->m_remid, &lkb);
2541 r = lkb->lkb_resource;
2546 receive_flags(lkb, ms);
2547 error = receive_unlock_args(ls, lkb, ms);
2551 error = do_unlock(r, lkb);
2553 send_unlock_reply(r, lkb, error);
2561 setup_stub_lkb(ls, ms);
2562 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2565 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2567 struct dlm_lkb *lkb;
2571 error = find_lkb(ls, ms->m_remid, &lkb);
2575 receive_flags(lkb, ms);
2577 r = lkb->lkb_resource;
2582 error = do_cancel(r, lkb);
2583 send_cancel_reply(r, lkb, error);
2591 setup_stub_lkb(ls, ms);
2592 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2595 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2597 struct dlm_lkb *lkb;
2601 error = find_lkb(ls, ms->m_remid, &lkb);
2603 log_error(ls, "receive_grant no lkb");
2606 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2608 r = lkb->lkb_resource;
2613 receive_flags_reply(lkb, ms);
2614 grant_lock_pc(r, lkb, ms);
2615 queue_cast(r, lkb, 0);
2622 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2624 struct dlm_lkb *lkb;
2628 error = find_lkb(ls, ms->m_remid, &lkb);
2630 log_error(ls, "receive_bast no lkb");
2633 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2635 r = lkb->lkb_resource;
2640 queue_bast(r, lkb, ms->m_bastmode);
2647 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
2649 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
2651 from_nodeid = ms->m_header.h_nodeid;
2652 our_nodeid = dlm_our_nodeid();
2654 len = receive_extralen(ms);
2656 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2657 if (dir_nodeid != our_nodeid) {
2658 log_error(ls, "lookup dir_nodeid %d from %d",
2659 dir_nodeid, from_nodeid);
2665 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
2667 /* Optimization: we're master so treat lookup as a request */
2668 if (!error && ret_nodeid == our_nodeid) {
2669 receive_request(ls, ms);
2673 send_lookup_reply(ls, ms, ret_nodeid, error);
2676 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
2678 int len, dir_nodeid, from_nodeid;
2680 from_nodeid = ms->m_header.h_nodeid;
2682 len = receive_extralen(ms);
2684 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2685 if (dir_nodeid != dlm_our_nodeid()) {
2686 log_error(ls, "remove dir entry dir_nodeid %d from %d",
2687 dir_nodeid, from_nodeid);
2691 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
2694 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2696 struct dlm_lkb *lkb;
2700 error = find_lkb(ls, ms->m_remid, &lkb);
2702 log_error(ls, "receive_request_reply no lkb");
2705 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2707 mstype = lkb->lkb_wait_type;
2708 error = remove_from_waiters(lkb);
2710 log_error(ls, "receive_request_reply not on waiters");
2714 /* this is the value returned from do_request() on the master */
2715 error = ms->m_result;
2717 r = lkb->lkb_resource;
2721 /* Optimization: the dir node was also the master, so it took our
2722 lookup as a request and sent request reply instead of lookup reply */
2723 if (mstype == DLM_MSG_LOOKUP) {
2724 r->res_nodeid = ms->m_header.h_nodeid;
2725 lkb->lkb_nodeid = r->res_nodeid;
2730 /* request would block (be queued) on remote master;
2731 the unhold undoes the original ref from create_lkb()
2732 so it leads to the lkb being freed */
2733 queue_cast(r, lkb, -EAGAIN);
2734 confirm_master(r, -EAGAIN);
2740 /* request was queued or granted on remote master */
2741 receive_flags_reply(lkb, ms);
2742 lkb->lkb_remid = ms->m_lkid;
2744 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2746 grant_lock_pc(r, lkb, ms);
2747 queue_cast(r, lkb, 0);
2749 confirm_master(r, error);
2754 /* find_rsb failed to find rsb or rsb wasn't master */
2756 lkb->lkb_nodeid = -1;
2757 _request_lock(r, lkb);
2761 log_error(ls, "receive_request_reply error %d", error);
2770 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2771 struct dlm_message *ms)
2773 int error = ms->m_result;
2775 /* this is the value returned from do_convert() on the master */
2779 /* convert would block (be queued) on remote master */
2780 queue_cast(r, lkb, -EAGAIN);
2784 /* convert was queued on remote master */
2786 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2790 /* convert was granted on remote master */
2791 receive_flags_reply(lkb, ms);
2792 grant_lock_pc(r, lkb, ms);
2793 queue_cast(r, lkb, 0);
2797 log_error(r->res_ls, "receive_convert_reply error %d", error);
2801 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2803 struct dlm_rsb *r = lkb->lkb_resource;
2808 __receive_convert_reply(r, lkb, ms);
2814 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
2816 struct dlm_lkb *lkb;
2819 error = find_lkb(ls, ms->m_remid, &lkb);
2821 log_error(ls, "receive_convert_reply no lkb");
2824 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2826 error = remove_from_waiters(lkb);
2828 log_error(ls, "receive_convert_reply not on waiters");
2832 _receive_convert_reply(lkb, ms);
2837 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2839 struct dlm_rsb *r = lkb->lkb_resource;
2840 int error = ms->m_result;
2845 /* this is the value returned from do_unlock() on the master */
2849 receive_flags_reply(lkb, ms);
2850 remove_lock_pc(r, lkb);
2851 queue_cast(r, lkb, -DLM_EUNLOCK);
2854 log_error(r->res_ls, "receive_unlock_reply error %d", error);
2861 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
2863 struct dlm_lkb *lkb;
2866 error = find_lkb(ls, ms->m_remid, &lkb);
2868 log_error(ls, "receive_unlock_reply no lkb");
2871 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2873 error = remove_from_waiters(lkb);
2875 log_error(ls, "receive_unlock_reply not on waiters");
2879 _receive_unlock_reply(lkb, ms);
2884 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2886 struct dlm_rsb *r = lkb->lkb_resource;
2887 int error = ms->m_result;
2892 /* this is the value returned from do_cancel() on the master */
2896 receive_flags_reply(lkb, ms);
2897 revert_lock_pc(r, lkb);
2898 queue_cast(r, lkb, -DLM_ECANCEL);
2901 log_error(r->res_ls, "receive_cancel_reply error %d", error);
2908 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
2910 struct dlm_lkb *lkb;
2913 error = find_lkb(ls, ms->m_remid, &lkb);
2915 log_error(ls, "receive_cancel_reply no lkb");
2918 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2920 error = remove_from_waiters(lkb);
2922 log_error(ls, "receive_cancel_reply not on waiters");
2926 _receive_cancel_reply(lkb, ms);
2931 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
2933 struct dlm_lkb *lkb;
2935 int error, ret_nodeid;
2937 error = find_lkb(ls, ms->m_lkid, &lkb);
2939 log_error(ls, "receive_lookup_reply no lkb");
2943 error = remove_from_waiters(lkb);
2945 log_error(ls, "receive_lookup_reply not on waiters");
2949 /* this is the value returned by dlm_dir_lookup on dir node
2950 FIXME: will a non-zero error ever be returned? */
2951 error = ms->m_result;
2953 r = lkb->lkb_resource;
2957 ret_nodeid = ms->m_nodeid;
2958 if (ret_nodeid == dlm_our_nodeid()) {
2961 r->res_first_lkid = 0;
2963 /* set_master() will copy res_nodeid to lkb_nodeid */
2964 r->res_nodeid = ret_nodeid;
2967 _request_lock(r, lkb);
2970 process_lookup_list(r);
2978 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
2980 struct dlm_message *ms = (struct dlm_message *) hd;
2987 ls = dlm_find_lockspace_global(hd->h_lockspace);
2989 log_print("drop message %d from %d for unknown lockspace %d",
2990 ms->m_type, nodeid, hd->h_lockspace);
2994 /* recovery may have just ended leaving a bunch of backed-up requests
2995 in the requestqueue; wait while dlm_recoverd clears them */
2998 dlm_wait_requestqueue(ls);
3000 /* recovery may have just started while there were a bunch of
3001 in-flight requests -- save them in requestqueue to be processed
3002 after recovery. we can't let dlm_recvd block on the recovery
3003 lock. if dlm_recoverd is calling this function to clear the
3004 requestqueue, it needs to be interrupted (-EINTR) if another
3005 recovery operation is starting. */
3008 if (dlm_locking_stopped(ls)) {
3010 dlm_add_requestqueue(ls, nodeid, hd);
3015 if (lock_recovery_try(ls))
3020 switch (ms->m_type) {
3022 /* messages sent to a master node */
3024 case DLM_MSG_REQUEST:
3025 receive_request(ls, ms);
3028 case DLM_MSG_CONVERT:
3029 receive_convert(ls, ms);
3032 case DLM_MSG_UNLOCK:
3033 receive_unlock(ls, ms);
3036 case DLM_MSG_CANCEL:
3037 receive_cancel(ls, ms);
3040 /* messages sent from a master node (replies to above) */
3042 case DLM_MSG_REQUEST_REPLY:
3043 receive_request_reply(ls, ms);
3046 case DLM_MSG_CONVERT_REPLY:
3047 receive_convert_reply(ls, ms);
3050 case DLM_MSG_UNLOCK_REPLY:
3051 receive_unlock_reply(ls, ms);
3054 case DLM_MSG_CANCEL_REPLY:
3055 receive_cancel_reply(ls, ms);
3058 /* messages sent from a master node (only two types of async msg) */
3061 receive_grant(ls, ms);
3065 receive_bast(ls, ms);
3068 /* messages sent to a dir node */
3070 case DLM_MSG_LOOKUP:
3071 receive_lookup(ls, ms);
3074 case DLM_MSG_REMOVE:
3075 receive_remove(ls, ms);
3078 /* messages sent from a dir node (remove has no reply) */
3080 case DLM_MSG_LOOKUP_REPLY:
3081 receive_lookup_reply(ls, ms);
3085 log_error(ls, "unknown message type %d", ms->m_type);
3088 unlock_recovery(ls);
3090 dlm_put_lockspace(ls);
3100 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3102 if (middle_conversion(lkb)) {
3104 ls->ls_stub_ms.m_result = -EINPROGRESS;
3105 _remove_from_waiters(lkb);
3106 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3108 /* Same special case as in receive_rcom_lock_args() */
3109 lkb->lkb_grmode = DLM_LOCK_IV;
3110 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3113 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3114 lkb->lkb_flags |= DLM_IFL_RESEND;
3117 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3118 conversions are async; there's no reply from the remote master */
3121 /* A waiting lkb needs recovery if the master node has failed, or
3122 the master node is changing (only when no directory is used) */
3124 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3126 if (dlm_is_removed(ls, lkb->lkb_nodeid))
3129 if (!dlm_no_directory(ls))
3132 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3138 /* Recovery for locks that are waiting for replies from nodes that are now
3139 gone. We can just complete unlocks and cancels by faking a reply from the
3140 dead node. Requests and up-conversions we flag to be resent after
3141 recovery. Down-conversions can just be completed with a fake reply like
3142 unlocks. Conversions between PR and CW need special attention. */
3144 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3146 struct dlm_lkb *lkb, *safe;
3148 mutex_lock(&ls->ls_waiters_mutex);
3150 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3151 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3152 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3154 /* all outstanding lookups, regardless of destination will be
3155 resent after recovery is done */
3157 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3158 lkb->lkb_flags |= DLM_IFL_RESEND;
3162 if (!waiter_needs_recovery(ls, lkb))
3165 switch (lkb->lkb_wait_type) {
3167 case DLM_MSG_REQUEST:
3168 lkb->lkb_flags |= DLM_IFL_RESEND;
3171 case DLM_MSG_CONVERT:
3172 recover_convert_waiter(ls, lkb);
3175 case DLM_MSG_UNLOCK:
3177 ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3178 _remove_from_waiters(lkb);
3179 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3183 case DLM_MSG_CANCEL:
3185 ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3186 _remove_from_waiters(lkb);
3187 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3192 log_error(ls, "invalid lkb wait_type %d",
3193 lkb->lkb_wait_type);
3196 mutex_unlock(&ls->ls_waiters_mutex);
3199 static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
3201 struct dlm_lkb *lkb;
3204 mutex_lock(&ls->ls_waiters_mutex);
3205 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3206 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3207 rv = lkb->lkb_wait_type;
3208 _remove_from_waiters(lkb);
3209 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3213 mutex_unlock(&ls->ls_waiters_mutex);
3221 /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3222 master or dir-node for r. Processing the lkb may result in it being placed
3225 int dlm_recover_waiters_post(struct dlm_ls *ls)
3227 struct dlm_lkb *lkb;
3229 int error = 0, mstype;
3232 if (dlm_locking_stopped(ls)) {
3233 log_debug(ls, "recover_waiters_post aborted");
3238 mstype = remove_resend_waiter(ls, &lkb);
3242 r = lkb->lkb_resource;
3244 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3245 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3249 case DLM_MSG_LOOKUP:
3252 _request_lock(r, lkb);
3254 confirm_master(r, 0);
3259 case DLM_MSG_REQUEST:
3262 _request_lock(r, lkb);
3267 case DLM_MSG_CONVERT:
3270 _convert_lock(r, lkb);
3276 log_error(ls, "recover_waiters_post type %d", mstype);
3283 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3284 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3286 struct dlm_ls *ls = r->res_ls;
3287 struct dlm_lkb *lkb, *safe;
3289 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3290 if (test(ls, lkb)) {
3291 rsb_set_flag(r, RSB_LOCKS_PURGED);
3293 /* this put should free the lkb */
3294 if (!dlm_put_lkb(lkb))
3295 log_error(ls, "purged lkb not released");
3300 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3302 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3305 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3307 return is_master_copy(lkb);
3310 static void purge_dead_locks(struct dlm_rsb *r)
3312 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3313 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3314 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3317 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3319 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3320 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3321 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3324 /* Get rid of locks held by nodes that are gone. */
3326 int dlm_purge_locks(struct dlm_ls *ls)
3330 log_debug(ls, "dlm_purge_locks");
3332 down_write(&ls->ls_root_sem);
3333 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3337 purge_dead_locks(r);
3343 up_write(&ls->ls_root_sem);
3348 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3350 struct dlm_rsb *r, *r_ret = NULL;
3352 read_lock(&ls->ls_rsbtbl[bucket].lock);
3353 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3354 if (!rsb_flag(r, RSB_LOCKS_PURGED))
3357 rsb_clear_flag(r, RSB_LOCKS_PURGED);
3361 read_unlock(&ls->ls_rsbtbl[bucket].lock);
3365 void dlm_grant_after_purge(struct dlm_ls *ls)
3370 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
3371 r = find_purged_rsb(ls, i);
3376 grant_pending_locks(r);
3377 confirm_master(r, 0);
3384 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3387 struct dlm_lkb *lkb;
3389 list_for_each_entry(lkb, head, lkb_statequeue) {
3390 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3396 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3399 struct dlm_lkb *lkb;
3401 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3404 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3407 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3413 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3414 struct dlm_rsb *r, struct dlm_rcom *rc)
3416 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3419 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3420 lkb->lkb_ownpid = rl->rl_ownpid;
3421 lkb->lkb_remid = rl->rl_lkid;
3422 lkb->lkb_exflags = rl->rl_exflags;
3423 lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3424 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3425 lkb->lkb_lvbseq = rl->rl_lvbseq;
3426 lkb->lkb_rqmode = rl->rl_rqmode;
3427 lkb->lkb_grmode = rl->rl_grmode;
3428 /* don't set lkb_status because add_lkb wants to itself */
3430 lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3431 lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3433 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3434 lkb->lkb_lvbptr = allocate_lvb(ls);
3435 if (!lkb->lkb_lvbptr)
3437 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3438 sizeof(struct rcom_lock);
3439 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3442 /* Conversions between PR and CW (middle modes) need special handling.
3443 The real granted mode of these converting locks cannot be determined
3444 until all locks have been rebuilt on the rsb (recover_conversion) */
3446 if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3447 rl->rl_status = DLM_LKSTS_CONVERT;
3448 lkb->lkb_grmode = DLM_LOCK_IV;
3449 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3455 /* This lkb may have been recovered in a previous aborted recovery so we need
3456 to check if the rsb already has an lkb with the given remote nodeid/lkid.
3457 If so we just send back a standard reply. If not, we create a new lkb with
3458 the given values and send back our lkid. We send back our lkid by sending
3459 back the rcom_lock struct we got but with the remid field filled in. */
3461 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3463 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3465 struct dlm_lkb *lkb;
3468 if (rl->rl_parent_lkid) {
3469 error = -EOPNOTSUPP;
3473 error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3479 lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3485 error = create_lkb(ls, &lkb);
3489 error = receive_rcom_lock_args(ls, lkb, r, rc);
3496 add_lkb(r, lkb, rl->rl_status);
3500 /* this is the new value returned to the lock holder for
3501 saving in its process-copy lkb */
3502 rl->rl_remid = lkb->lkb_id;
3509 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3510 rl->rl_result = error;
3514 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3516 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3518 struct dlm_lkb *lkb;
3521 error = find_lkb(ls, rl->rl_lkid, &lkb);
3523 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3527 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3529 error = rl->rl_result;
3531 r = lkb->lkb_resource;
3537 log_debug(ls, "master copy exists %x", lkb->lkb_id);
3540 lkb->lkb_remid = rl->rl_remid;
3543 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
3544 error, lkb->lkb_id);
3547 /* an ack for dlm_recover_locks() which waits for replies from
3548 all the locks it sends to new masters */
3549 dlm_recovered_lock(r);
3558 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
3559 int mode, uint32_t flags, void *name, unsigned int namelen,
3560 uint32_t parent_lkid)
3562 struct dlm_lkb *lkb;
3563 struct dlm_args args;
3568 error = create_lkb(ls, &lkb);
3574 if (flags & DLM_LKF_VALBLK) {
3575 ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3576 if (!ua->lksb.sb_lvbptr) {
3584 /* After ua is attached to lkb it will be freed by free_lkb().
3585 When DLM_IFL_USER is set, the dlm knows that this is a userspace
3586 lock and that lkb_astparam is the dlm_user_args structure. */
3588 error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
3589 FAKE_USER_AST, ua, FAKE_USER_AST, &args);
3590 lkb->lkb_flags |= DLM_IFL_USER;
3591 ua->old_mode = DLM_LOCK_IV;
3598 error = request_lock(ls, lkb, name, namelen, &args);
3614 /* add this new lkb to the per-process list of locks */
3615 spin_lock(&ua->proc->locks_spin);
3616 kref_get(&lkb->lkb_ref);
3617 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
3618 spin_unlock(&ua->proc->locks_spin);
3620 unlock_recovery(ls);
3624 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3625 int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
3627 struct dlm_lkb *lkb;
3628 struct dlm_args args;
3629 struct dlm_user_args *ua;
3634 error = find_lkb(ls, lkid, &lkb);
3638 /* user can change the params on its lock when it converts it, or
3639 add an lvb that didn't exist before */
3641 ua = (struct dlm_user_args *)lkb->lkb_astparam;
3643 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
3644 ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3645 if (!ua->lksb.sb_lvbptr) {
3650 if (lvb_in && ua->lksb.sb_lvbptr)
3651 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3653 ua->castparam = ua_tmp->castparam;
3654 ua->castaddr = ua_tmp->castaddr;
3655 ua->bastparam = ua_tmp->bastparam;
3656 ua->bastaddr = ua_tmp->bastaddr;
3657 ua->old_mode = lkb->lkb_grmode;
3659 error = set_lock_args(mode, &ua->lksb, flags, 0, 0, FAKE_USER_AST, ua,
3660 FAKE_USER_AST, &args);
3664 error = convert_lock(ls, lkb, &args);
3666 if (error == -EINPROGRESS || error == -EAGAIN)
3671 unlock_recovery(ls);
3676 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3677 uint32_t flags, uint32_t lkid, char *lvb_in)
3679 struct dlm_lkb *lkb;
3680 struct dlm_args args;
3681 struct dlm_user_args *ua;
3686 error = find_lkb(ls, lkid, &lkb);
3690 ua = (struct dlm_user_args *)lkb->lkb_astparam;
3692 if (lvb_in && ua->lksb.sb_lvbptr)
3693 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3694 ua->castparam = ua_tmp->castparam;
3696 error = set_unlock_args(flags, ua, &args);
3700 error = unlock_lock(ls, lkb, &args);
3702 if (error == -DLM_EUNLOCK)
3707 spin_lock(&ua->proc->locks_spin);
3708 list_del(&lkb->lkb_ownqueue);
3709 spin_unlock(&ua->proc->locks_spin);
3711 /* this removes the reference for the proc->locks list added by
3717 unlock_recovery(ls);
3721 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3722 uint32_t flags, uint32_t lkid)
3724 struct dlm_lkb *lkb;
3725 struct dlm_args args;
3726 struct dlm_user_args *ua;
3731 error = find_lkb(ls, lkid, &lkb);
3735 ua = (struct dlm_user_args *)lkb->lkb_astparam;
3736 ua->castparam = ua_tmp->castparam;
3738 error = set_unlock_args(flags, ua, &args);
3742 error = cancel_lock(ls, lkb, &args);
3744 if (error == -DLM_ECANCEL)
3749 /* this lkb was removed from the WAITING queue */
3750 if (lkb->lkb_grmode == DLM_LOCK_IV) {
3751 spin_lock(&ua->proc->locks_spin);
3752 list_del(&lkb->lkb_ownqueue);
3753 spin_unlock(&ua->proc->locks_spin);
3759 unlock_recovery(ls);
3763 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3765 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3767 if (ua->lksb.sb_lvbptr)
3768 kfree(ua->lksb.sb_lvbptr);
3770 lkb->lkb_astparam = (long)NULL;
3772 /* TODO: propogate to master if needed */
3776 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
3777 Regardless of what rsb queue the lock is on, it's removed and freed. */
3779 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3781 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3782 struct dlm_args args;
3785 /* FIXME: we need to handle the case where the lkb is in limbo
3786 while the rsb is being looked up, currently we assert in
3787 _unlock_lock/is_remote because rsb nodeid is -1. */
3789 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
3791 error = unlock_lock(ls, lkb, &args);
3792 if (error == -DLM_EUNLOCK)
3797 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
3798 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
3799 which we clear here. */
3801 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
3802 list, and no more device_writes should add lkb's to proc->locks list; so we
3803 shouldn't need to take asts_spin or locks_spin here. this assumes that
3804 device reads/writes/closes are serialized -- FIXME: we may need to serialize
3807 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
3809 struct dlm_lkb *lkb, *safe;
3812 mutex_lock(&ls->ls_clear_proc_locks);
3814 list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) {
3815 if (lkb->lkb_ast_type) {
3816 list_del(&lkb->lkb_astqueue);
3820 list_del(&lkb->lkb_ownqueue);
3822 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) {
3823 lkb->lkb_flags |= DLM_IFL_ORPHAN;
3824 orphan_proc_lock(ls, lkb);
3826 lkb->lkb_flags |= DLM_IFL_DEAD;
3827 unlock_proc_lock(ls, lkb);
3830 /* this removes the reference for the proc->locks list
3831 added by dlm_user_request, it may result in the lkb
3836 mutex_unlock(&ls->ls_clear_proc_locks);
3837 unlock_recovery(ls);