]> err.no Git - linux-2.6/blob - fs/dlm/lock.c
[DLM] down conversion clearing flags
[linux-2.6] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86                                     struct dlm_message *ms);
87 static int receive_extralen(struct dlm_message *ms);
88
89 #define FAKE_USER_AST (void*)0xff00ff00
90
91 /*
92  * Lock compatibilty matrix - thanks Steve
93  * UN = Unlocked state. Not really a state, used as a flag
94  * PD = Padding. Used to make the matrix a nice power of two in size
95  * Other states are the same as the VMS DLM.
96  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
97  */
98
99 static const int __dlm_compat_matrix[8][8] = {
100       /* UN NL CR CW PR PW EX PD */
101         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
102         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
103         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
104         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
105         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
106         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
107         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
108         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
109 };
110
111 /*
112  * This defines the direction of transfer of LVB data.
113  * Granted mode is the row; requested mode is the column.
114  * Usage: matrix[grmode+1][rqmode+1]
115  * 1 = LVB is returned to the caller
116  * 0 = LVB is written to the resource
117  * -1 = nothing happens to the LVB
118  */
119
120 const int dlm_lvb_operations[8][8] = {
121         /* UN   NL  CR  CW  PR  PW  EX  PD*/
122         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
123         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
124         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
125         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
126         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
127         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
128         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
129         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
130 };
131
132 #define modes_compat(gr, rq) \
133         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
134
135 int dlm_modes_compat(int mode1, int mode2)
136 {
137         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
138 }
139
140 /*
141  * Compatibility matrix for conversions with QUECVT set.
142  * Granted mode is the row; requested mode is the column.
143  * Usage: matrix[grmode+1][rqmode+1]
144  */
145
146 static const int __quecvt_compat_matrix[8][8] = {
147       /* UN NL CR CW PR PW EX PD */
148         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
149         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
150         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
151         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
152         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
153         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
154         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
155         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
156 };
157
158 void dlm_print_lkb(struct dlm_lkb *lkb)
159 {
160         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
161                "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
162                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
163                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
164                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
165 }
166
167 void dlm_print_rsb(struct dlm_rsb *r)
168 {
169         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
170                r->res_nodeid, r->res_flags, r->res_first_lkid,
171                r->res_recover_locks_count, r->res_name);
172 }
173
174 void dlm_dump_rsb(struct dlm_rsb *r)
175 {
176         struct dlm_lkb *lkb;
177
178         dlm_print_rsb(r);
179
180         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
181                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
182         printk(KERN_ERR "rsb lookup list\n");
183         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
184                 dlm_print_lkb(lkb);
185         printk(KERN_ERR "rsb grant queue:\n");
186         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
187                 dlm_print_lkb(lkb);
188         printk(KERN_ERR "rsb convert queue:\n");
189         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
190                 dlm_print_lkb(lkb);
191         printk(KERN_ERR "rsb wait queue:\n");
192         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
193                 dlm_print_lkb(lkb);
194 }
195
196 /* Threads cannot use the lockspace while it's being recovered */
197
198 static inline void lock_recovery(struct dlm_ls *ls)
199 {
200         down_read(&ls->ls_in_recovery);
201 }
202
203 static inline void unlock_recovery(struct dlm_ls *ls)
204 {
205         up_read(&ls->ls_in_recovery);
206 }
207
208 static inline int lock_recovery_try(struct dlm_ls *ls)
209 {
210         return down_read_trylock(&ls->ls_in_recovery);
211 }
212
213 static inline int can_be_queued(struct dlm_lkb *lkb)
214 {
215         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
216 }
217
218 static inline int force_blocking_asts(struct dlm_lkb *lkb)
219 {
220         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
221 }
222
223 static inline int is_demoted(struct dlm_lkb *lkb)
224 {
225         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
226 }
227
228 static inline int is_remote(struct dlm_rsb *r)
229 {
230         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
231         return !!r->res_nodeid;
232 }
233
234 static inline int is_process_copy(struct dlm_lkb *lkb)
235 {
236         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
237 }
238
239 static inline int is_master_copy(struct dlm_lkb *lkb)
240 {
241         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
242                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
243         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
244 }
245
246 static inline int middle_conversion(struct dlm_lkb *lkb)
247 {
248         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
249             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
250                 return 1;
251         return 0;
252 }
253
254 static inline int down_conversion(struct dlm_lkb *lkb)
255 {
256         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
257 }
258
259 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
260 {
261         if (is_master_copy(lkb))
262                 return;
263
264         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
265
266         lkb->lkb_lksb->sb_status = rv;
267         lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
268
269         dlm_add_ast(lkb, AST_COMP);
270 }
271
272 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
273 {
274         if (is_master_copy(lkb))
275                 send_bast(r, lkb, rqmode);
276         else {
277                 lkb->lkb_bastmode = rqmode;
278                 dlm_add_ast(lkb, AST_BAST);
279         }
280 }
281
282 /*
283  * Basic operations on rsb's and lkb's
284  */
285
286 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
287 {
288         struct dlm_rsb *r;
289
290         r = allocate_rsb(ls, len);
291         if (!r)
292                 return NULL;
293
294         r->res_ls = ls;
295         r->res_length = len;
296         memcpy(r->res_name, name, len);
297         mutex_init(&r->res_mutex);
298
299         INIT_LIST_HEAD(&r->res_lookup);
300         INIT_LIST_HEAD(&r->res_grantqueue);
301         INIT_LIST_HEAD(&r->res_convertqueue);
302         INIT_LIST_HEAD(&r->res_waitqueue);
303         INIT_LIST_HEAD(&r->res_root_list);
304         INIT_LIST_HEAD(&r->res_recover_list);
305
306         return r;
307 }
308
309 static int search_rsb_list(struct list_head *head, char *name, int len,
310                            unsigned int flags, struct dlm_rsb **r_ret)
311 {
312         struct dlm_rsb *r;
313         int error = 0;
314
315         list_for_each_entry(r, head, res_hashchain) {
316                 if (len == r->res_length && !memcmp(name, r->res_name, len))
317                         goto found;
318         }
319         return -EBADR;
320
321  found:
322         if (r->res_nodeid && (flags & R_MASTER))
323                 error = -ENOTBLK;
324         *r_ret = r;
325         return error;
326 }
327
328 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
329                        unsigned int flags, struct dlm_rsb **r_ret)
330 {
331         struct dlm_rsb *r;
332         int error;
333
334         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
335         if (!error) {
336                 kref_get(&r->res_ref);
337                 goto out;
338         }
339         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
340         if (error)
341                 goto out;
342
343         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
344
345         if (dlm_no_directory(ls))
346                 goto out;
347
348         if (r->res_nodeid == -1) {
349                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
350                 r->res_first_lkid = 0;
351         } else if (r->res_nodeid > 0) {
352                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
353                 r->res_first_lkid = 0;
354         } else {
355                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
356                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
357         }
358  out:
359         *r_ret = r;
360         return error;
361 }
362
363 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
364                       unsigned int flags, struct dlm_rsb **r_ret)
365 {
366         int error;
367         write_lock(&ls->ls_rsbtbl[b].lock);
368         error = _search_rsb(ls, name, len, b, flags, r_ret);
369         write_unlock(&ls->ls_rsbtbl[b].lock);
370         return error;
371 }
372
373 /*
374  * Find rsb in rsbtbl and potentially create/add one
375  *
376  * Delaying the release of rsb's has a similar benefit to applications keeping
377  * NL locks on an rsb, but without the guarantee that the cached master value
378  * will still be valid when the rsb is reused.  Apps aren't always smart enough
379  * to keep NL locks on an rsb that they may lock again shortly; this can lead
380  * to excessive master lookups and removals if we don't delay the release.
381  *
382  * Searching for an rsb means looking through both the normal list and toss
383  * list.  When found on the toss list the rsb is moved to the normal list with
384  * ref count of 1; when found on normal list the ref count is incremented.
385  */
386
387 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
388                     unsigned int flags, struct dlm_rsb **r_ret)
389 {
390         struct dlm_rsb *r, *tmp;
391         uint32_t hash, bucket;
392         int error = 0;
393
394         if (dlm_no_directory(ls))
395                 flags |= R_CREATE;
396
397         hash = jhash(name, namelen, 0);
398         bucket = hash & (ls->ls_rsbtbl_size - 1);
399
400         error = search_rsb(ls, name, namelen, bucket, flags, &r);
401         if (!error)
402                 goto out;
403
404         if (error == -EBADR && !(flags & R_CREATE))
405                 goto out;
406
407         /* the rsb was found but wasn't a master copy */
408         if (error == -ENOTBLK)
409                 goto out;
410
411         error = -ENOMEM;
412         r = create_rsb(ls, name, namelen);
413         if (!r)
414                 goto out;
415
416         r->res_hash = hash;
417         r->res_bucket = bucket;
418         r->res_nodeid = -1;
419         kref_init(&r->res_ref);
420
421         /* With no directory, the master can be set immediately */
422         if (dlm_no_directory(ls)) {
423                 int nodeid = dlm_dir_nodeid(r);
424                 if (nodeid == dlm_our_nodeid())
425                         nodeid = 0;
426                 r->res_nodeid = nodeid;
427         }
428
429         write_lock(&ls->ls_rsbtbl[bucket].lock);
430         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
431         if (!error) {
432                 write_unlock(&ls->ls_rsbtbl[bucket].lock);
433                 free_rsb(r);
434                 r = tmp;
435                 goto out;
436         }
437         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
438         write_unlock(&ls->ls_rsbtbl[bucket].lock);
439         error = 0;
440  out:
441         *r_ret = r;
442         return error;
443 }
444
445 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
446                  unsigned int flags, struct dlm_rsb **r_ret)
447 {
448         return find_rsb(ls, name, namelen, flags, r_ret);
449 }
450
451 /* This is only called to add a reference when the code already holds
452    a valid reference to the rsb, so there's no need for locking. */
453
454 static inline void hold_rsb(struct dlm_rsb *r)
455 {
456         kref_get(&r->res_ref);
457 }
458
459 void dlm_hold_rsb(struct dlm_rsb *r)
460 {
461         hold_rsb(r);
462 }
463
464 static void toss_rsb(struct kref *kref)
465 {
466         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
467         struct dlm_ls *ls = r->res_ls;
468
469         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
470         kref_init(&r->res_ref);
471         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
472         r->res_toss_time = jiffies;
473         if (r->res_lvbptr) {
474                 free_lvb(r->res_lvbptr);
475                 r->res_lvbptr = NULL;
476         }
477 }
478
479 /* When all references to the rsb are gone it's transfered to
480    the tossed list for later disposal. */
481
482 static void put_rsb(struct dlm_rsb *r)
483 {
484         struct dlm_ls *ls = r->res_ls;
485         uint32_t bucket = r->res_bucket;
486
487         write_lock(&ls->ls_rsbtbl[bucket].lock);
488         kref_put(&r->res_ref, toss_rsb);
489         write_unlock(&ls->ls_rsbtbl[bucket].lock);
490 }
491
492 void dlm_put_rsb(struct dlm_rsb *r)
493 {
494         put_rsb(r);
495 }
496
497 /* See comment for unhold_lkb */
498
499 static void unhold_rsb(struct dlm_rsb *r)
500 {
501         int rv;
502         rv = kref_put(&r->res_ref, toss_rsb);
503         DLM_ASSERT(!rv, dlm_dump_rsb(r););
504 }
505
506 static void kill_rsb(struct kref *kref)
507 {
508         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
509
510         /* All work is done after the return from kref_put() so we
511            can release the write_lock before the remove and free. */
512
513         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
514         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
515         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
516         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
517         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
518         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
519 }
520
521 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
522    The rsb must exist as long as any lkb's for it do. */
523
524 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
525 {
526         hold_rsb(r);
527         lkb->lkb_resource = r;
528 }
529
530 static void detach_lkb(struct dlm_lkb *lkb)
531 {
532         if (lkb->lkb_resource) {
533                 put_rsb(lkb->lkb_resource);
534                 lkb->lkb_resource = NULL;
535         }
536 }
537
538 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
539 {
540         struct dlm_lkb *lkb, *tmp;
541         uint32_t lkid = 0;
542         uint16_t bucket;
543
544         lkb = allocate_lkb(ls);
545         if (!lkb)
546                 return -ENOMEM;
547
548         lkb->lkb_nodeid = -1;
549         lkb->lkb_grmode = DLM_LOCK_IV;
550         kref_init(&lkb->lkb_ref);
551         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
552
553         get_random_bytes(&bucket, sizeof(bucket));
554         bucket &= (ls->ls_lkbtbl_size - 1);
555
556         write_lock(&ls->ls_lkbtbl[bucket].lock);
557
558         /* counter can roll over so we must verify lkid is not in use */
559
560         while (lkid == 0) {
561                 lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16);
562
563                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
564                                     lkb_idtbl_list) {
565                         if (tmp->lkb_id != lkid)
566                                 continue;
567                         lkid = 0;
568                         break;
569                 }
570         }
571
572         lkb->lkb_id = lkid;
573         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
574         write_unlock(&ls->ls_lkbtbl[bucket].lock);
575
576         *lkb_ret = lkb;
577         return 0;
578 }
579
580 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
581 {
582         uint16_t bucket = lkid & 0xFFFF;
583         struct dlm_lkb *lkb;
584
585         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
586                 if (lkb->lkb_id == lkid)
587                         return lkb;
588         }
589         return NULL;
590 }
591
592 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
593 {
594         struct dlm_lkb *lkb;
595         uint16_t bucket = lkid & 0xFFFF;
596
597         if (bucket >= ls->ls_lkbtbl_size)
598                 return -EBADSLT;
599
600         read_lock(&ls->ls_lkbtbl[bucket].lock);
601         lkb = __find_lkb(ls, lkid);
602         if (lkb)
603                 kref_get(&lkb->lkb_ref);
604         read_unlock(&ls->ls_lkbtbl[bucket].lock);
605
606         *lkb_ret = lkb;
607         return lkb ? 0 : -ENOENT;
608 }
609
610 static void kill_lkb(struct kref *kref)
611 {
612         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
613
614         /* All work is done after the return from kref_put() so we
615            can release the write_lock before the detach_lkb */
616
617         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
618 }
619
620 /* __put_lkb() is used when an lkb may not have an rsb attached to
621    it so we need to provide the lockspace explicitly */
622
623 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
624 {
625         uint16_t bucket = lkb->lkb_id & 0xFFFF;
626
627         write_lock(&ls->ls_lkbtbl[bucket].lock);
628         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
629                 list_del(&lkb->lkb_idtbl_list);
630                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
631
632                 detach_lkb(lkb);
633
634                 /* for local/process lkbs, lvbptr points to caller's lksb */
635                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
636                         free_lvb(lkb->lkb_lvbptr);
637                 free_lkb(lkb);
638                 return 1;
639         } else {
640                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
641                 return 0;
642         }
643 }
644
645 int dlm_put_lkb(struct dlm_lkb *lkb)
646 {
647         struct dlm_ls *ls;
648
649         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
650         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
651
652         ls = lkb->lkb_resource->res_ls;
653         return __put_lkb(ls, lkb);
654 }
655
656 /* This is only called to add a reference when the code already holds
657    a valid reference to the lkb, so there's no need for locking. */
658
659 static inline void hold_lkb(struct dlm_lkb *lkb)
660 {
661         kref_get(&lkb->lkb_ref);
662 }
663
664 /* This is called when we need to remove a reference and are certain
665    it's not the last ref.  e.g. del_lkb is always called between a
666    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
667    put_lkb would work fine, but would involve unnecessary locking */
668
669 static inline void unhold_lkb(struct dlm_lkb *lkb)
670 {
671         int rv;
672         rv = kref_put(&lkb->lkb_ref, kill_lkb);
673         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
674 }
675
676 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
677                             int mode)
678 {
679         struct dlm_lkb *lkb = NULL;
680
681         list_for_each_entry(lkb, head, lkb_statequeue)
682                 if (lkb->lkb_rqmode < mode)
683                         break;
684
685         if (!lkb)
686                 list_add_tail(new, head);
687         else
688                 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
689 }
690
691 /* add/remove lkb to rsb's grant/convert/wait queue */
692
693 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
694 {
695         kref_get(&lkb->lkb_ref);
696
697         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
698
699         lkb->lkb_status = status;
700
701         switch (status) {
702         case DLM_LKSTS_WAITING:
703                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
704                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
705                 else
706                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
707                 break;
708         case DLM_LKSTS_GRANTED:
709                 /* convention says granted locks kept in order of grmode */
710                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
711                                 lkb->lkb_grmode);
712                 break;
713         case DLM_LKSTS_CONVERT:
714                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
715                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
716                 else
717                         list_add_tail(&lkb->lkb_statequeue,
718                                       &r->res_convertqueue);
719                 break;
720         default:
721                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
722         }
723 }
724
725 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
726 {
727         lkb->lkb_status = 0;
728         list_del(&lkb->lkb_statequeue);
729         unhold_lkb(lkb);
730 }
731
732 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
733 {
734         hold_lkb(lkb);
735         del_lkb(r, lkb);
736         add_lkb(r, lkb, sts);
737         unhold_lkb(lkb);
738 }
739
740 /* add/remove lkb from global waiters list of lkb's waiting for
741    a reply from a remote node */
742
743 static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
744 {
745         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
746
747         mutex_lock(&ls->ls_waiters_mutex);
748         if (lkb->lkb_wait_type) {
749                 log_print("add_to_waiters error %d", lkb->lkb_wait_type);
750                 goto out;
751         }
752         lkb->lkb_wait_type = mstype;
753         kref_get(&lkb->lkb_ref);
754         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
755  out:
756         mutex_unlock(&ls->ls_waiters_mutex);
757 }
758
759 static int _remove_from_waiters(struct dlm_lkb *lkb)
760 {
761         int error = 0;
762
763         if (!lkb->lkb_wait_type) {
764                 log_print("remove_from_waiters error");
765                 error = -EINVAL;
766                 goto out;
767         }
768         lkb->lkb_wait_type = 0;
769         list_del(&lkb->lkb_wait_reply);
770         unhold_lkb(lkb);
771  out:
772         return error;
773 }
774
775 static int remove_from_waiters(struct dlm_lkb *lkb)
776 {
777         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
778         int error;
779
780         mutex_lock(&ls->ls_waiters_mutex);
781         error = _remove_from_waiters(lkb);
782         mutex_unlock(&ls->ls_waiters_mutex);
783         return error;
784 }
785
786 static void dir_remove(struct dlm_rsb *r)
787 {
788         int to_nodeid;
789
790         if (dlm_no_directory(r->res_ls))
791                 return;
792
793         to_nodeid = dlm_dir_nodeid(r);
794         if (to_nodeid != dlm_our_nodeid())
795                 send_remove(r);
796         else
797                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
798                                      r->res_name, r->res_length);
799 }
800
801 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
802    found since they are in order of newest to oldest? */
803
804 static int shrink_bucket(struct dlm_ls *ls, int b)
805 {
806         struct dlm_rsb *r;
807         int count = 0, found;
808
809         for (;;) {
810                 found = 0;
811                 write_lock(&ls->ls_rsbtbl[b].lock);
812                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
813                                             res_hashchain) {
814                         if (!time_after_eq(jiffies, r->res_toss_time +
815                                            dlm_config.toss_secs * HZ))
816                                 continue;
817                         found = 1;
818                         break;
819                 }
820
821                 if (!found) {
822                         write_unlock(&ls->ls_rsbtbl[b].lock);
823                         break;
824                 }
825
826                 if (kref_put(&r->res_ref, kill_rsb)) {
827                         list_del(&r->res_hashchain);
828                         write_unlock(&ls->ls_rsbtbl[b].lock);
829
830                         if (is_master(r))
831                                 dir_remove(r);
832                         free_rsb(r);
833                         count++;
834                 } else {
835                         write_unlock(&ls->ls_rsbtbl[b].lock);
836                         log_error(ls, "tossed rsb in use %s", r->res_name);
837                 }
838         }
839
840         return count;
841 }
842
843 void dlm_scan_rsbs(struct dlm_ls *ls)
844 {
845         int i;
846
847         if (dlm_locking_stopped(ls))
848                 return;
849
850         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
851                 shrink_bucket(ls, i);
852                 cond_resched();
853         }
854 }
855
856 /* lkb is master or local copy */
857
858 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
859 {
860         int b, len = r->res_ls->ls_lvblen;
861
862         /* b=1 lvb returned to caller
863            b=0 lvb written to rsb or invalidated
864            b=-1 do nothing */
865
866         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
867
868         if (b == 1) {
869                 if (!lkb->lkb_lvbptr)
870                         return;
871
872                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
873                         return;
874
875                 if (!r->res_lvbptr)
876                         return;
877
878                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
879                 lkb->lkb_lvbseq = r->res_lvbseq;
880
881         } else if (b == 0) {
882                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
883                         rsb_set_flag(r, RSB_VALNOTVALID);
884                         return;
885                 }
886
887                 if (!lkb->lkb_lvbptr)
888                         return;
889
890                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
891                         return;
892
893                 if (!r->res_lvbptr)
894                         r->res_lvbptr = allocate_lvb(r->res_ls);
895
896                 if (!r->res_lvbptr)
897                         return;
898
899                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
900                 r->res_lvbseq++;
901                 lkb->lkb_lvbseq = r->res_lvbseq;
902                 rsb_clear_flag(r, RSB_VALNOTVALID);
903         }
904
905         if (rsb_flag(r, RSB_VALNOTVALID))
906                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
907 }
908
909 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
910 {
911         if (lkb->lkb_grmode < DLM_LOCK_PW)
912                 return;
913
914         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
915                 rsb_set_flag(r, RSB_VALNOTVALID);
916                 return;
917         }
918
919         if (!lkb->lkb_lvbptr)
920                 return;
921
922         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
923                 return;
924
925         if (!r->res_lvbptr)
926                 r->res_lvbptr = allocate_lvb(r->res_ls);
927
928         if (!r->res_lvbptr)
929                 return;
930
931         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
932         r->res_lvbseq++;
933         rsb_clear_flag(r, RSB_VALNOTVALID);
934 }
935
936 /* lkb is process copy (pc) */
937
938 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
939                             struct dlm_message *ms)
940 {
941         int b;
942
943         if (!lkb->lkb_lvbptr)
944                 return;
945
946         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
947                 return;
948
949         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
950         if (b == 1) {
951                 int len = receive_extralen(ms);
952                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
953                 lkb->lkb_lvbseq = ms->m_lvbseq;
954         }
955 }
956
957 /* Manipulate lkb's on rsb's convert/granted/waiting queues
958    remove_lock -- used for unlock, removes lkb from granted
959    revert_lock -- used for cancel, moves lkb from convert to granted
960    grant_lock  -- used for request and convert, adds lkb to granted or
961                   moves lkb from convert or waiting to granted
962
963    Each of these is used for master or local copy lkb's.  There is
964    also a _pc() variation used to make the corresponding change on
965    a process copy (pc) lkb. */
966
967 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
968 {
969         del_lkb(r, lkb);
970         lkb->lkb_grmode = DLM_LOCK_IV;
971         /* this unhold undoes the original ref from create_lkb()
972            so this leads to the lkb being freed */
973         unhold_lkb(lkb);
974 }
975
976 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
977 {
978         set_lvb_unlock(r, lkb);
979         _remove_lock(r, lkb);
980 }
981
982 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
983 {
984         _remove_lock(r, lkb);
985 }
986
987 static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
988 {
989         lkb->lkb_rqmode = DLM_LOCK_IV;
990
991         switch (lkb->lkb_status) {
992         case DLM_LKSTS_GRANTED:
993                 break;
994         case DLM_LKSTS_CONVERT:
995                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
996                 break;
997         case DLM_LKSTS_WAITING:
998                 del_lkb(r, lkb);
999                 lkb->lkb_grmode = DLM_LOCK_IV;
1000                 /* this unhold undoes the original ref from create_lkb()
1001                    so this leads to the lkb being freed */
1002                 unhold_lkb(lkb);
1003                 break;
1004         default:
1005                 log_print("invalid status for revert %d", lkb->lkb_status);
1006         }
1007 }
1008
1009 static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1010 {
1011         revert_lock(r, lkb);
1012 }
1013
1014 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1015 {
1016         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1017                 lkb->lkb_grmode = lkb->lkb_rqmode;
1018                 if (lkb->lkb_status)
1019                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1020                 else
1021                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1022         }
1023
1024         lkb->lkb_rqmode = DLM_LOCK_IV;
1025 }
1026
1027 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1028 {
1029         set_lvb_lock(r, lkb);
1030         _grant_lock(r, lkb);
1031         lkb->lkb_highbast = 0;
1032 }
1033
1034 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1035                           struct dlm_message *ms)
1036 {
1037         set_lvb_lock_pc(r, lkb, ms);
1038         _grant_lock(r, lkb);
1039 }
1040
1041 /* called by grant_pending_locks() which means an async grant message must
1042    be sent to the requesting node in addition to granting the lock if the
1043    lkb belongs to a remote node. */
1044
1045 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1046 {
1047         grant_lock(r, lkb);
1048         if (is_master_copy(lkb))
1049                 send_grant(r, lkb);
1050         else
1051                 queue_cast(r, lkb, 0);
1052 }
1053
1054 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1055 {
1056         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1057                                            lkb_statequeue);
1058         if (lkb->lkb_id == first->lkb_id)
1059                 return 1;
1060
1061         return 0;
1062 }
1063
1064 /* Check if the given lkb conflicts with another lkb on the queue. */
1065
1066 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1067 {
1068         struct dlm_lkb *this;
1069
1070         list_for_each_entry(this, head, lkb_statequeue) {
1071                 if (this == lkb)
1072                         continue;
1073                 if (!modes_compat(this, lkb))
1074                         return 1;
1075         }
1076         return 0;
1077 }
1078
1079 /*
1080  * "A conversion deadlock arises with a pair of lock requests in the converting
1081  * queue for one resource.  The granted mode of each lock blocks the requested
1082  * mode of the other lock."
1083  *
1084  * Part 2: if the granted mode of lkb is preventing the first lkb in the
1085  * convert queue from being granted, then demote lkb (set grmode to NL).
1086  * This second form requires that we check for conv-deadlk even when
1087  * now == 0 in _can_be_granted().
1088  *
1089  * Example:
1090  * Granted Queue: empty
1091  * Convert Queue: NL->EX (first lock)
1092  *                PR->EX (second lock)
1093  *
1094  * The first lock can't be granted because of the granted mode of the second
1095  * lock and the second lock can't be granted because it's not first in the
1096  * list.  We demote the granted mode of the second lock (the lkb passed to this
1097  * function).
1098  *
1099  * After the resolution, the "grant pending" function needs to go back and try
1100  * to grant locks on the convert queue again since the first lock can now be
1101  * granted.
1102  */
1103
1104 static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1105 {
1106         struct dlm_lkb *this, *first = NULL, *self = NULL;
1107
1108         list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1109                 if (!first)
1110                         first = this;
1111                 if (this == lkb) {
1112                         self = lkb;
1113                         continue;
1114                 }
1115
1116                 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
1117                         return 1;
1118         }
1119
1120         /* if lkb is on the convert queue and is preventing the first
1121            from being granted, then there's deadlock and we demote lkb.
1122            multiple converting locks may need to do this before the first
1123            converting lock can be granted. */
1124
1125         if (self && self != first) {
1126                 if (!modes_compat(lkb, first) &&
1127                     !queue_conflict(&rsb->res_grantqueue, first))
1128                         return 1;
1129         }
1130
1131         return 0;
1132 }
1133
1134 /*
1135  * Return 1 if the lock can be granted, 0 otherwise.
1136  * Also detect and resolve conversion deadlocks.
1137  *
1138  * lkb is the lock to be granted
1139  *
1140  * now is 1 if the function is being called in the context of the
1141  * immediate request, it is 0 if called later, after the lock has been
1142  * queued.
1143  *
1144  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1145  */
1146
1147 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1148 {
1149         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1150
1151         /*
1152          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1153          * a new request for a NL mode lock being blocked.
1154          *
1155          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1156          * request, then it would be granted.  In essence, the use of this flag
1157          * tells the Lock Manager to expedite theis request by not considering
1158          * what may be in the CONVERTING or WAITING queues...  As of this
1159          * writing, the EXPEDITE flag can be used only with new requests for NL
1160          * mode locks.  This flag is not valid for conversion requests.
1161          *
1162          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1163          * conversion or used with a non-NL requested mode.  We also know an
1164          * EXPEDITE request is always granted immediately, so now must always
1165          * be 1.  The full condition to grant an expedite request: (now &&
1166          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1167          * therefore be shortened to just checking the flag.
1168          */
1169
1170         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1171                 return 1;
1172
1173         /*
1174          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1175          * added to the remaining conditions.
1176          */
1177
1178         if (queue_conflict(&r->res_grantqueue, lkb))
1179                 goto out;
1180
1181         /*
1182          * 6-3: By default, a conversion request is immediately granted if the
1183          * requested mode is compatible with the modes of all other granted
1184          * locks
1185          */
1186
1187         if (queue_conflict(&r->res_convertqueue, lkb))
1188                 goto out;
1189
1190         /*
1191          * 6-5: But the default algorithm for deciding whether to grant or
1192          * queue conversion requests does not by itself guarantee that such
1193          * requests are serviced on a "first come first serve" basis.  This, in
1194          * turn, can lead to a phenomenon known as "indefinate postponement".
1195          *
1196          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1197          * the system service employed to request a lock conversion.  This flag
1198          * forces certain conversion requests to be queued, even if they are
1199          * compatible with the granted modes of other locks on the same
1200          * resource.  Thus, the use of this flag results in conversion requests
1201          * being ordered on a "first come first servce" basis.
1202          *
1203          * DCT: This condition is all about new conversions being able to occur
1204          * "in place" while the lock remains on the granted queue (assuming
1205          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1206          * doesn't _have_ to go onto the convert queue where it's processed in
1207          * order.  The "now" variable is necessary to distinguish converts
1208          * being received and processed for the first time now, because once a
1209          * convert is moved to the conversion queue the condition below applies
1210          * requiring fifo granting.
1211          */
1212
1213         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1214                 return 1;
1215
1216         /*
1217          * The NOORDER flag is set to avoid the standard vms rules on grant
1218          * order.
1219          */
1220
1221         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1222                 return 1;
1223
1224         /*
1225          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1226          * granted until all other conversion requests ahead of it are granted
1227          * and/or canceled.
1228          */
1229
1230         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1231                 return 1;
1232
1233         /*
1234          * 6-4: By default, a new request is immediately granted only if all
1235          * three of the following conditions are satisfied when the request is
1236          * issued:
1237          * - The queue of ungranted conversion requests for the resource is
1238          *   empty.
1239          * - The queue of ungranted new requests for the resource is empty.
1240          * - The mode of the new request is compatible with the most
1241          *   restrictive mode of all granted locks on the resource.
1242          */
1243
1244         if (now && !conv && list_empty(&r->res_convertqueue) &&
1245             list_empty(&r->res_waitqueue))
1246                 return 1;
1247
1248         /*
1249          * 6-4: Once a lock request is in the queue of ungranted new requests,
1250          * it cannot be granted until the queue of ungranted conversion
1251          * requests is empty, all ungranted new requests ahead of it are
1252          * granted and/or canceled, and it is compatible with the granted mode
1253          * of the most restrictive lock granted on the resource.
1254          */
1255
1256         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1257             first_in_list(lkb, &r->res_waitqueue))
1258                 return 1;
1259
1260  out:
1261         /*
1262          * The following, enabled by CONVDEADLK, departs from VMS.
1263          */
1264
1265         if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1266             conversion_deadlock_detect(r, lkb)) {
1267                 lkb->lkb_grmode = DLM_LOCK_NL;
1268                 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1269         }
1270
1271         return 0;
1272 }
1273
1274 /*
1275  * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1276  * simple way to provide a big optimization to applications that can use them.
1277  */
1278
1279 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1280 {
1281         uint32_t flags = lkb->lkb_exflags;
1282         int rv;
1283         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1284
1285         rv = _can_be_granted(r, lkb, now);
1286         if (rv)
1287                 goto out;
1288
1289         if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1290                 goto out;
1291
1292         if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1293                 alt = DLM_LOCK_PR;
1294         else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1295                 alt = DLM_LOCK_CW;
1296
1297         if (alt) {
1298                 lkb->lkb_rqmode = alt;
1299                 rv = _can_be_granted(r, lkb, now);
1300                 if (rv)
1301                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1302                 else
1303                         lkb->lkb_rqmode = rqmode;
1304         }
1305  out:
1306         return rv;
1307 }
1308
1309 static int grant_pending_convert(struct dlm_rsb *r, int high)
1310 {
1311         struct dlm_lkb *lkb, *s;
1312         int hi, demoted, quit, grant_restart, demote_restart;
1313
1314         quit = 0;
1315  restart:
1316         grant_restart = 0;
1317         demote_restart = 0;
1318         hi = DLM_LOCK_IV;
1319
1320         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1321                 demoted = is_demoted(lkb);
1322                 if (can_be_granted(r, lkb, 0)) {
1323                         grant_lock_pending(r, lkb);
1324                         grant_restart = 1;
1325                 } else {
1326                         hi = max_t(int, lkb->lkb_rqmode, hi);
1327                         if (!demoted && is_demoted(lkb))
1328                                 demote_restart = 1;
1329                 }
1330         }
1331
1332         if (grant_restart)
1333                 goto restart;
1334         if (demote_restart && !quit) {
1335                 quit = 1;
1336                 goto restart;
1337         }
1338
1339         return max_t(int, high, hi);
1340 }
1341
1342 static int grant_pending_wait(struct dlm_rsb *r, int high)
1343 {
1344         struct dlm_lkb *lkb, *s;
1345
1346         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1347                 if (can_be_granted(r, lkb, 0))
1348                         grant_lock_pending(r, lkb);
1349                 else
1350                         high = max_t(int, lkb->lkb_rqmode, high);
1351         }
1352
1353         return high;
1354 }
1355
1356 static void grant_pending_locks(struct dlm_rsb *r)
1357 {
1358         struct dlm_lkb *lkb, *s;
1359         int high = DLM_LOCK_IV;
1360
1361         DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1362
1363         high = grant_pending_convert(r, high);
1364         high = grant_pending_wait(r, high);
1365
1366         if (high == DLM_LOCK_IV)
1367                 return;
1368
1369         /*
1370          * If there are locks left on the wait/convert queue then send blocking
1371          * ASTs to granted locks based on the largest requested mode (high)
1372          * found above. FIXME: highbast < high comparison not valid for PR/CW.
1373          */
1374
1375         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1376                 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1377                     !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1378                         queue_bast(r, lkb, high);
1379                         lkb->lkb_highbast = high;
1380                 }
1381         }
1382 }
1383
1384 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1385                             struct dlm_lkb *lkb)
1386 {
1387         struct dlm_lkb *gr;
1388
1389         list_for_each_entry(gr, head, lkb_statequeue) {
1390                 if (gr->lkb_bastaddr &&
1391                     gr->lkb_highbast < lkb->lkb_rqmode &&
1392                     !modes_compat(gr, lkb)) {
1393                         queue_bast(r, gr, lkb->lkb_rqmode);
1394                         gr->lkb_highbast = lkb->lkb_rqmode;
1395                 }
1396         }
1397 }
1398
1399 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1400 {
1401         send_bast_queue(r, &r->res_grantqueue, lkb);
1402 }
1403
1404 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1405 {
1406         send_bast_queue(r, &r->res_grantqueue, lkb);
1407         send_bast_queue(r, &r->res_convertqueue, lkb);
1408 }
1409
1410 /* set_master(r, lkb) -- set the master nodeid of a resource
1411
1412    The purpose of this function is to set the nodeid field in the given
1413    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1414    known, it can just be copied to the lkb and the function will return
1415    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1416    before it can be copied to the lkb.
1417
1418    When the rsb nodeid is being looked up remotely, the initial lkb
1419    causing the lookup is kept on the ls_waiters list waiting for the
1420    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1421    on the rsb's res_lookup list until the master is verified.
1422
1423    Return values:
1424    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1425    1: the rsb master is not available and the lkb has been placed on
1426       a wait queue
1427 */
1428
1429 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1430 {
1431         struct dlm_ls *ls = r->res_ls;
1432         int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1433
1434         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1435                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1436                 r->res_first_lkid = lkb->lkb_id;
1437                 lkb->lkb_nodeid = r->res_nodeid;
1438                 return 0;
1439         }
1440
1441         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1442                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1443                 return 1;
1444         }
1445
1446         if (r->res_nodeid == 0) {
1447                 lkb->lkb_nodeid = 0;
1448                 return 0;
1449         }
1450
1451         if (r->res_nodeid > 0) {
1452                 lkb->lkb_nodeid = r->res_nodeid;
1453                 return 0;
1454         }
1455
1456         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1457
1458         dir_nodeid = dlm_dir_nodeid(r);
1459
1460         if (dir_nodeid != our_nodeid) {
1461                 r->res_first_lkid = lkb->lkb_id;
1462                 send_lookup(r, lkb);
1463                 return 1;
1464         }
1465
1466         for (;;) {
1467                 /* It's possible for dlm_scand to remove an old rsb for
1468                    this same resource from the toss list, us to create
1469                    a new one, look up the master locally, and find it
1470                    already exists just before dlm_scand does the
1471                    dir_remove() on the previous rsb. */
1472
1473                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1474                                        r->res_length, &ret_nodeid);
1475                 if (!error)
1476                         break;
1477                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1478                 schedule();
1479         }
1480
1481         if (ret_nodeid == our_nodeid) {
1482                 r->res_first_lkid = 0;
1483                 r->res_nodeid = 0;
1484                 lkb->lkb_nodeid = 0;
1485         } else {
1486                 r->res_first_lkid = lkb->lkb_id;
1487                 r->res_nodeid = ret_nodeid;
1488                 lkb->lkb_nodeid = ret_nodeid;
1489         }
1490         return 0;
1491 }
1492
1493 static void process_lookup_list(struct dlm_rsb *r)
1494 {
1495         struct dlm_lkb *lkb, *safe;
1496
1497         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1498                 list_del(&lkb->lkb_rsb_lookup);
1499                 _request_lock(r, lkb);
1500                 schedule();
1501         }
1502 }
1503
1504 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1505
1506 static void confirm_master(struct dlm_rsb *r, int error)
1507 {
1508         struct dlm_lkb *lkb;
1509
1510         if (!r->res_first_lkid)
1511                 return;
1512
1513         switch (error) {
1514         case 0:
1515         case -EINPROGRESS:
1516                 r->res_first_lkid = 0;
1517                 process_lookup_list(r);
1518                 break;
1519
1520         case -EAGAIN:
1521                 /* the remote master didn't queue our NOQUEUE request;
1522                    make a waiting lkb the first_lkid */
1523
1524                 r->res_first_lkid = 0;
1525
1526                 if (!list_empty(&r->res_lookup)) {
1527                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1528                                          lkb_rsb_lookup);
1529                         list_del(&lkb->lkb_rsb_lookup);
1530                         r->res_first_lkid = lkb->lkb_id;
1531                         _request_lock(r, lkb);
1532                 } else
1533                         r->res_nodeid = -1;
1534                 break;
1535
1536         default:
1537                 log_error(r->res_ls, "confirm_master unknown error %d", error);
1538         }
1539 }
1540
1541 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1542                          int namelen, uint32_t parent_lkid, void *ast,
1543                          void *astarg, void *bast, struct dlm_args *args)
1544 {
1545         int rv = -EINVAL;
1546
1547         /* check for invalid arg usage */
1548
1549         if (mode < 0 || mode > DLM_LOCK_EX)
1550                 goto out;
1551
1552         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1553                 goto out;
1554
1555         if (flags & DLM_LKF_CANCEL)
1556                 goto out;
1557
1558         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1559                 goto out;
1560
1561         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1562                 goto out;
1563
1564         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1565                 goto out;
1566
1567         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1568                 goto out;
1569
1570         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1571                 goto out;
1572
1573         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1574                 goto out;
1575
1576         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1577                 goto out;
1578
1579         if (!ast || !lksb)
1580                 goto out;
1581
1582         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1583                 goto out;
1584
1585         /* parent/child locks not yet supported */
1586         if (parent_lkid)
1587                 goto out;
1588
1589         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1590                 goto out;
1591
1592         /* these args will be copied to the lkb in validate_lock_args,
1593            it cannot be done now because when converting locks, fields in
1594            an active lkb cannot be modified before locking the rsb */
1595
1596         args->flags = flags;
1597         args->astaddr = ast;
1598         args->astparam = (long) astarg;
1599         args->bastaddr = bast;
1600         args->mode = mode;
1601         args->lksb = lksb;
1602         rv = 0;
1603  out:
1604         return rv;
1605 }
1606
1607 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1608 {
1609         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1610                       DLM_LKF_FORCEUNLOCK))
1611                 return -EINVAL;
1612
1613         args->flags = flags;
1614         args->astparam = (long) astarg;
1615         return 0;
1616 }
1617
1618 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1619                               struct dlm_args *args)
1620 {
1621         int rv = -EINVAL;
1622
1623         if (args->flags & DLM_LKF_CONVERT) {
1624                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1625                         goto out;
1626
1627                 if (args->flags & DLM_LKF_QUECVT &&
1628                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1629                         goto out;
1630
1631                 rv = -EBUSY;
1632                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1633                         goto out;
1634
1635                 if (lkb->lkb_wait_type)
1636                         goto out;
1637         }
1638
1639         lkb->lkb_exflags = args->flags;
1640         lkb->lkb_sbflags = 0;
1641         lkb->lkb_astaddr = args->astaddr;
1642         lkb->lkb_astparam = args->astparam;
1643         lkb->lkb_bastaddr = args->bastaddr;
1644         lkb->lkb_rqmode = args->mode;
1645         lkb->lkb_lksb = args->lksb;
1646         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1647         lkb->lkb_ownpid = (int) current->pid;
1648         rv = 0;
1649  out:
1650         return rv;
1651 }
1652
1653 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1654 {
1655         int rv = -EINVAL;
1656
1657         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1658                 goto out;
1659
1660         if (args->flags & DLM_LKF_FORCEUNLOCK)
1661                 goto out_ok;
1662
1663         if (args->flags & DLM_LKF_CANCEL &&
1664             lkb->lkb_status == DLM_LKSTS_GRANTED)
1665                 goto out;
1666
1667         if (!(args->flags & DLM_LKF_CANCEL) &&
1668             lkb->lkb_status != DLM_LKSTS_GRANTED)
1669                 goto out;
1670
1671         rv = -EBUSY;
1672         if (lkb->lkb_wait_type)
1673                 goto out;
1674
1675  out_ok:
1676         lkb->lkb_exflags = args->flags;
1677         lkb->lkb_sbflags = 0;
1678         lkb->lkb_astparam = args->astparam;
1679
1680         rv = 0;
1681  out:
1682         return rv;
1683 }
1684
1685 /*
1686  * Four stage 4 varieties:
1687  * do_request(), do_convert(), do_unlock(), do_cancel()
1688  * These are called on the master node for the given lock and
1689  * from the central locking logic.
1690  */
1691
1692 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1693 {
1694         int error = 0;
1695
1696         if (can_be_granted(r, lkb, 1)) {
1697                 grant_lock(r, lkb);
1698                 queue_cast(r, lkb, 0);
1699                 goto out;
1700         }
1701
1702         if (can_be_queued(lkb)) {
1703                 error = -EINPROGRESS;
1704                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1705                 send_blocking_asts(r, lkb);
1706                 goto out;
1707         }
1708
1709         error = -EAGAIN;
1710         if (force_blocking_asts(lkb))
1711                 send_blocking_asts_all(r, lkb);
1712         queue_cast(r, lkb, -EAGAIN);
1713
1714  out:
1715         return error;
1716 }
1717
1718 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1719 {
1720         int error = 0;
1721
1722         /* changing an existing lock may allow others to be granted */
1723
1724         if (can_be_granted(r, lkb, 1)) {
1725                 grant_lock(r, lkb);
1726                 queue_cast(r, lkb, 0);
1727                 grant_pending_locks(r);
1728                 goto out;
1729         }
1730
1731         if (can_be_queued(lkb)) {
1732                 if (is_demoted(lkb))
1733                         grant_pending_locks(r);
1734                 error = -EINPROGRESS;
1735                 del_lkb(r, lkb);
1736                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
1737                 send_blocking_asts(r, lkb);
1738                 goto out;
1739         }
1740
1741         error = -EAGAIN;
1742         if (force_blocking_asts(lkb))
1743                 send_blocking_asts_all(r, lkb);
1744         queue_cast(r, lkb, -EAGAIN);
1745
1746  out:
1747         return error;
1748 }
1749
1750 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1751 {
1752         remove_lock(r, lkb);
1753         queue_cast(r, lkb, -DLM_EUNLOCK);
1754         grant_pending_locks(r);
1755         return -DLM_EUNLOCK;
1756 }
1757
1758 /* FIXME: if revert_lock() finds that the lkb is granted, we should
1759    skip the queue_cast(ECANCEL).  It indicates that the request/convert
1760    completed (and queued a normal ast) just before the cancel; we don't
1761    want to clobber the sb_result for the normal ast with ECANCEL. */
1762    
1763 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1764 {
1765         revert_lock(r, lkb);
1766         queue_cast(r, lkb, -DLM_ECANCEL);
1767         grant_pending_locks(r);
1768         return -DLM_ECANCEL;
1769 }
1770
1771 /*
1772  * Four stage 3 varieties:
1773  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
1774  */
1775
1776 /* add a new lkb to a possibly new rsb, called by requesting process */
1777
1778 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1779 {
1780         int error;
1781
1782         /* set_master: sets lkb nodeid from r */
1783
1784         error = set_master(r, lkb);
1785         if (error < 0)
1786                 goto out;
1787         if (error) {
1788                 error = 0;
1789                 goto out;
1790         }
1791
1792         if (is_remote(r))
1793                 /* receive_request() calls do_request() on remote node */
1794                 error = send_request(r, lkb);
1795         else
1796                 error = do_request(r, lkb);
1797  out:
1798         return error;
1799 }
1800
1801 /* change some property of an existing lkb, e.g. mode */
1802
1803 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1804 {
1805         int error;
1806
1807         if (is_remote(r))
1808                 /* receive_convert() calls do_convert() on remote node */
1809                 error = send_convert(r, lkb);
1810         else
1811                 error = do_convert(r, lkb);
1812
1813         return error;
1814 }
1815
1816 /* remove an existing lkb from the granted queue */
1817
1818 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1819 {
1820         int error;
1821
1822         if (is_remote(r))
1823                 /* receive_unlock() calls do_unlock() on remote node */
1824                 error = send_unlock(r, lkb);
1825         else
1826                 error = do_unlock(r, lkb);
1827
1828         return error;
1829 }
1830
1831 /* remove an existing lkb from the convert or wait queue */
1832
1833 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1834 {
1835         int error;
1836
1837         if (is_remote(r))
1838                 /* receive_cancel() calls do_cancel() on remote node */
1839                 error = send_cancel(r, lkb);
1840         else
1841                 error = do_cancel(r, lkb);
1842
1843         return error;
1844 }
1845
1846 /*
1847  * Four stage 2 varieties:
1848  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
1849  */
1850
1851 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
1852                         int len, struct dlm_args *args)
1853 {
1854         struct dlm_rsb *r;
1855         int error;
1856
1857         error = validate_lock_args(ls, lkb, args);
1858         if (error)
1859                 goto out;
1860
1861         error = find_rsb(ls, name, len, R_CREATE, &r);
1862         if (error)
1863                 goto out;
1864
1865         lock_rsb(r);
1866
1867         attach_lkb(r, lkb);
1868         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
1869
1870         error = _request_lock(r, lkb);
1871
1872         unlock_rsb(r);
1873         put_rsb(r);
1874
1875  out:
1876         return error;
1877 }
1878
1879 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1880                         struct dlm_args *args)
1881 {
1882         struct dlm_rsb *r;
1883         int error;
1884
1885         r = lkb->lkb_resource;
1886
1887         hold_rsb(r);
1888         lock_rsb(r);
1889
1890         error = validate_lock_args(ls, lkb, args);
1891         if (error)
1892                 goto out;
1893
1894         error = _convert_lock(r, lkb);
1895  out:
1896         unlock_rsb(r);
1897         put_rsb(r);
1898         return error;
1899 }
1900
1901 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1902                        struct dlm_args *args)
1903 {
1904         struct dlm_rsb *r;
1905         int error;
1906
1907         r = lkb->lkb_resource;
1908
1909         hold_rsb(r);
1910         lock_rsb(r);
1911
1912         error = validate_unlock_args(lkb, args);
1913         if (error)
1914                 goto out;
1915
1916         error = _unlock_lock(r, lkb);
1917  out:
1918         unlock_rsb(r);
1919         put_rsb(r);
1920         return error;
1921 }
1922
1923 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1924                        struct dlm_args *args)
1925 {
1926         struct dlm_rsb *r;
1927         int error;
1928
1929         r = lkb->lkb_resource;
1930
1931         hold_rsb(r);
1932         lock_rsb(r);
1933
1934         error = validate_unlock_args(lkb, args);
1935         if (error)
1936                 goto out;
1937
1938         error = _cancel_lock(r, lkb);
1939  out:
1940         unlock_rsb(r);
1941         put_rsb(r);
1942         return error;
1943 }
1944
1945 /*
1946  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
1947  */
1948
1949 int dlm_lock(dlm_lockspace_t *lockspace,
1950              int mode,
1951              struct dlm_lksb *lksb,
1952              uint32_t flags,
1953              void *name,
1954              unsigned int namelen,
1955              uint32_t parent_lkid,
1956              void (*ast) (void *astarg),
1957              void *astarg,
1958              void (*bast) (void *astarg, int mode))
1959 {
1960         struct dlm_ls *ls;
1961         struct dlm_lkb *lkb;
1962         struct dlm_args args;
1963         int error, convert = flags & DLM_LKF_CONVERT;
1964
1965         ls = dlm_find_lockspace_local(lockspace);
1966         if (!ls)
1967                 return -EINVAL;
1968
1969         lock_recovery(ls);
1970
1971         if (convert)
1972                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
1973         else
1974                 error = create_lkb(ls, &lkb);
1975
1976         if (error)
1977                 goto out;
1978
1979         error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
1980                               astarg, bast, &args);
1981         if (error)
1982                 goto out_put;
1983
1984         if (convert)
1985                 error = convert_lock(ls, lkb, &args);
1986         else
1987                 error = request_lock(ls, lkb, name, namelen, &args);
1988
1989         if (error == -EINPROGRESS)
1990                 error = 0;
1991  out_put:
1992         if (convert || error)
1993                 __put_lkb(ls, lkb);
1994         if (error == -EAGAIN)
1995                 error = 0;
1996  out:
1997         unlock_recovery(ls);
1998         dlm_put_lockspace(ls);
1999         return error;
2000 }
2001
2002 int dlm_unlock(dlm_lockspace_t *lockspace,
2003                uint32_t lkid,
2004                uint32_t flags,
2005                struct dlm_lksb *lksb,
2006                void *astarg)
2007 {
2008         struct dlm_ls *ls;
2009         struct dlm_lkb *lkb;
2010         struct dlm_args args;
2011         int error;
2012
2013         ls = dlm_find_lockspace_local(lockspace);
2014         if (!ls)
2015                 return -EINVAL;
2016
2017         lock_recovery(ls);
2018
2019         error = find_lkb(ls, lkid, &lkb);
2020         if (error)
2021                 goto out;
2022
2023         error = set_unlock_args(flags, astarg, &args);
2024         if (error)
2025                 goto out_put;
2026
2027         if (flags & DLM_LKF_CANCEL)
2028                 error = cancel_lock(ls, lkb, &args);
2029         else
2030                 error = unlock_lock(ls, lkb, &args);
2031
2032         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2033                 error = 0;
2034  out_put:
2035         dlm_put_lkb(lkb);
2036  out:
2037         unlock_recovery(ls);
2038         dlm_put_lockspace(ls);
2039         return error;
2040 }
2041
2042 /*
2043  * send/receive routines for remote operations and replies
2044  *
2045  * send_args
2046  * send_common
2047  * send_request                 receive_request
2048  * send_convert                 receive_convert
2049  * send_unlock                  receive_unlock
2050  * send_cancel                  receive_cancel
2051  * send_grant                   receive_grant
2052  * send_bast                    receive_bast
2053  * send_lookup                  receive_lookup
2054  * send_remove                  receive_remove
2055  *
2056  *                              send_common_reply
2057  * receive_request_reply        send_request_reply
2058  * receive_convert_reply        send_convert_reply
2059  * receive_unlock_reply         send_unlock_reply
2060  * receive_cancel_reply         send_cancel_reply
2061  * receive_lookup_reply         send_lookup_reply
2062  */
2063
2064 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2065                           int to_nodeid, int mstype,
2066                           struct dlm_message **ms_ret,
2067                           struct dlm_mhandle **mh_ret)
2068 {
2069         struct dlm_message *ms;
2070         struct dlm_mhandle *mh;
2071         char *mb;
2072         int mb_len = sizeof(struct dlm_message);
2073
2074         switch (mstype) {
2075         case DLM_MSG_REQUEST:
2076         case DLM_MSG_LOOKUP:
2077         case DLM_MSG_REMOVE:
2078                 mb_len += r->res_length;
2079                 break;
2080         case DLM_MSG_CONVERT:
2081         case DLM_MSG_UNLOCK:
2082         case DLM_MSG_REQUEST_REPLY:
2083         case DLM_MSG_CONVERT_REPLY:
2084         case DLM_MSG_GRANT:
2085                 if (lkb && lkb->lkb_lvbptr)
2086                         mb_len += r->res_ls->ls_lvblen;
2087                 break;
2088         }
2089
2090         /* get_buffer gives us a message handle (mh) that we need to
2091            pass into lowcomms_commit and a message buffer (mb) that we
2092            write our data into */
2093
2094         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2095         if (!mh)
2096                 return -ENOBUFS;
2097
2098         memset(mb, 0, mb_len);
2099
2100         ms = (struct dlm_message *) mb;
2101
2102         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2103         ms->m_header.h_lockspace = r->res_ls->ls_global_id;
2104         ms->m_header.h_nodeid = dlm_our_nodeid();
2105         ms->m_header.h_length = mb_len;
2106         ms->m_header.h_cmd = DLM_MSG;
2107
2108         ms->m_type = mstype;
2109
2110         *mh_ret = mh;
2111         *ms_ret = ms;
2112         return 0;
2113 }
2114
2115 /* further lowcomms enhancements or alternate implementations may make
2116    the return value from this function useful at some point */
2117
2118 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2119 {
2120         dlm_message_out(ms);
2121         dlm_lowcomms_commit_buffer(mh);
2122         return 0;
2123 }
2124
2125 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2126                       struct dlm_message *ms)
2127 {
2128         ms->m_nodeid   = lkb->lkb_nodeid;
2129         ms->m_pid      = lkb->lkb_ownpid;
2130         ms->m_lkid     = lkb->lkb_id;
2131         ms->m_remid    = lkb->lkb_remid;
2132         ms->m_exflags  = lkb->lkb_exflags;
2133         ms->m_sbflags  = lkb->lkb_sbflags;
2134         ms->m_flags    = lkb->lkb_flags;
2135         ms->m_lvbseq   = lkb->lkb_lvbseq;
2136         ms->m_status   = lkb->lkb_status;
2137         ms->m_grmode   = lkb->lkb_grmode;
2138         ms->m_rqmode   = lkb->lkb_rqmode;
2139         ms->m_hash     = r->res_hash;
2140
2141         /* m_result and m_bastmode are set from function args,
2142            not from lkb fields */
2143
2144         if (lkb->lkb_bastaddr)
2145                 ms->m_asts |= AST_BAST;
2146         if (lkb->lkb_astaddr)
2147                 ms->m_asts |= AST_COMP;
2148
2149         if (ms->m_type == DLM_MSG_REQUEST || ms->m_type == DLM_MSG_LOOKUP)
2150                 memcpy(ms->m_extra, r->res_name, r->res_length);
2151
2152         else if (lkb->lkb_lvbptr)
2153                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2154
2155 }
2156
2157 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2158 {
2159         struct dlm_message *ms;
2160         struct dlm_mhandle *mh;
2161         int to_nodeid, error;
2162
2163         add_to_waiters(lkb, mstype);
2164
2165         to_nodeid = r->res_nodeid;
2166
2167         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2168         if (error)
2169                 goto fail;
2170
2171         send_args(r, lkb, ms);
2172
2173         error = send_message(mh, ms);
2174         if (error)
2175                 goto fail;
2176         return 0;
2177
2178  fail:
2179         remove_from_waiters(lkb);
2180         return error;
2181 }
2182
2183 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2184 {
2185         return send_common(r, lkb, DLM_MSG_REQUEST);
2186 }
2187
2188 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2189 {
2190         int error;
2191
2192         error = send_common(r, lkb, DLM_MSG_CONVERT);
2193
2194         /* down conversions go without a reply from the master */
2195         if (!error && down_conversion(lkb)) {
2196                 remove_from_waiters(lkb);
2197                 r->res_ls->ls_stub_ms.m_result = 0;
2198                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2199         }
2200
2201         return error;
2202 }
2203
2204 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2205    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2206    that the master is still correct. */
2207
2208 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2209 {
2210         return send_common(r, lkb, DLM_MSG_UNLOCK);
2211 }
2212
2213 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2214 {
2215         return send_common(r, lkb, DLM_MSG_CANCEL);
2216 }
2217
2218 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2219 {
2220         struct dlm_message *ms;
2221         struct dlm_mhandle *mh;
2222         int to_nodeid, error;
2223
2224         to_nodeid = lkb->lkb_nodeid;
2225
2226         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2227         if (error)
2228                 goto out;
2229
2230         send_args(r, lkb, ms);
2231
2232         ms->m_result = 0;
2233
2234         error = send_message(mh, ms);
2235  out:
2236         return error;
2237 }
2238
2239 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2240 {
2241         struct dlm_message *ms;
2242         struct dlm_mhandle *mh;
2243         int to_nodeid, error;
2244
2245         to_nodeid = lkb->lkb_nodeid;
2246
2247         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2248         if (error)
2249                 goto out;
2250
2251         send_args(r, lkb, ms);
2252
2253         ms->m_bastmode = mode;
2254
2255         error = send_message(mh, ms);
2256  out:
2257         return error;
2258 }
2259
2260 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2261 {
2262         struct dlm_message *ms;
2263         struct dlm_mhandle *mh;
2264         int to_nodeid, error;
2265
2266         add_to_waiters(lkb, DLM_MSG_LOOKUP);
2267
2268         to_nodeid = dlm_dir_nodeid(r);
2269
2270         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2271         if (error)
2272                 goto fail;
2273
2274         send_args(r, lkb, ms);
2275
2276         error = send_message(mh, ms);
2277         if (error)
2278                 goto fail;
2279         return 0;
2280
2281  fail:
2282         remove_from_waiters(lkb);
2283         return error;
2284 }
2285
2286 static int send_remove(struct dlm_rsb *r)
2287 {
2288         struct dlm_message *ms;
2289         struct dlm_mhandle *mh;
2290         int to_nodeid, error;
2291
2292         to_nodeid = dlm_dir_nodeid(r);
2293
2294         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2295         if (error)
2296                 goto out;
2297
2298         memcpy(ms->m_extra, r->res_name, r->res_length);
2299         ms->m_hash = r->res_hash;
2300
2301         error = send_message(mh, ms);
2302  out:
2303         return error;
2304 }
2305
2306 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2307                              int mstype, int rv)
2308 {
2309         struct dlm_message *ms;
2310         struct dlm_mhandle *mh;
2311         int to_nodeid, error;
2312
2313         to_nodeid = lkb->lkb_nodeid;
2314
2315         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2316         if (error)
2317                 goto out;
2318
2319         send_args(r, lkb, ms);
2320
2321         ms->m_result = rv;
2322
2323         error = send_message(mh, ms);
2324  out:
2325         return error;
2326 }
2327
2328 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2329 {
2330         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2331 }
2332
2333 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2334 {
2335         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2336 }
2337
2338 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2339 {
2340         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2341 }
2342
2343 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2344 {
2345         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2346 }
2347
2348 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2349                              int ret_nodeid, int rv)
2350 {
2351         struct dlm_rsb *r = &ls->ls_stub_rsb;
2352         struct dlm_message *ms;
2353         struct dlm_mhandle *mh;
2354         int error, nodeid = ms_in->m_header.h_nodeid;
2355
2356         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2357         if (error)
2358                 goto out;
2359
2360         ms->m_lkid = ms_in->m_lkid;
2361         ms->m_result = rv;
2362         ms->m_nodeid = ret_nodeid;
2363
2364         error = send_message(mh, ms);
2365  out:
2366         return error;
2367 }
2368
2369 /* which args we save from a received message depends heavily on the type
2370    of message, unlike the send side where we can safely send everything about
2371    the lkb for any type of message */
2372
2373 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2374 {
2375         lkb->lkb_exflags = ms->m_exflags;
2376         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2377                          (ms->m_flags & 0x0000FFFF);
2378 }
2379
2380 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2381 {
2382         lkb->lkb_sbflags = ms->m_sbflags;
2383         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2384                          (ms->m_flags & 0x0000FFFF);
2385 }
2386
2387 static int receive_extralen(struct dlm_message *ms)
2388 {
2389         return (ms->m_header.h_length - sizeof(struct dlm_message));
2390 }
2391
2392 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2393                        struct dlm_message *ms)
2394 {
2395         int len;
2396
2397         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2398                 if (!lkb->lkb_lvbptr)
2399                         lkb->lkb_lvbptr = allocate_lvb(ls);
2400                 if (!lkb->lkb_lvbptr)
2401                         return -ENOMEM;
2402                 len = receive_extralen(ms);
2403                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2404         }
2405         return 0;
2406 }
2407
2408 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2409                                 struct dlm_message *ms)
2410 {
2411         lkb->lkb_nodeid = ms->m_header.h_nodeid;
2412         lkb->lkb_ownpid = ms->m_pid;
2413         lkb->lkb_remid = ms->m_lkid;
2414         lkb->lkb_grmode = DLM_LOCK_IV;
2415         lkb->lkb_rqmode = ms->m_rqmode;
2416         lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2417         lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2418
2419         DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2420
2421         if (receive_lvb(ls, lkb, ms))
2422                 return -ENOMEM;
2423
2424         return 0;
2425 }
2426
2427 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2428                                 struct dlm_message *ms)
2429 {
2430         if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2431                 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2432                           lkb->lkb_nodeid, ms->m_header.h_nodeid,
2433                           lkb->lkb_id, lkb->lkb_remid);
2434                 return -EINVAL;
2435         }
2436
2437         if (!is_master_copy(lkb))
2438                 return -EINVAL;
2439
2440         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2441                 return -EBUSY;
2442
2443         if (receive_lvb(ls, lkb, ms))
2444                 return -ENOMEM;
2445
2446         lkb->lkb_rqmode = ms->m_rqmode;
2447         lkb->lkb_lvbseq = ms->m_lvbseq;
2448
2449         return 0;
2450 }
2451
2452 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2453                                struct dlm_message *ms)
2454 {
2455         if (!is_master_copy(lkb))
2456                 return -EINVAL;
2457         if (receive_lvb(ls, lkb, ms))
2458                 return -ENOMEM;
2459         return 0;
2460 }
2461
2462 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2463    uses to send a reply and that the remote end uses to process the reply. */
2464
2465 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2466 {
2467         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2468         lkb->lkb_nodeid = ms->m_header.h_nodeid;
2469         lkb->lkb_remid = ms->m_lkid;
2470 }
2471
2472 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2473 {
2474         struct dlm_lkb *lkb;
2475         struct dlm_rsb *r;
2476         int error, namelen;
2477
2478         error = create_lkb(ls, &lkb);
2479         if (error)
2480                 goto fail;
2481
2482         receive_flags(lkb, ms);
2483         lkb->lkb_flags |= DLM_IFL_MSTCPY;
2484         error = receive_request_args(ls, lkb, ms);
2485         if (error) {
2486                 __put_lkb(ls, lkb);
2487                 goto fail;
2488         }
2489
2490         namelen = receive_extralen(ms);
2491
2492         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2493         if (error) {
2494                 __put_lkb(ls, lkb);
2495                 goto fail;
2496         }
2497
2498         lock_rsb(r);
2499
2500         attach_lkb(r, lkb);
2501         error = do_request(r, lkb);
2502         send_request_reply(r, lkb, error);
2503
2504         unlock_rsb(r);
2505         put_rsb(r);
2506
2507         if (error == -EINPROGRESS)
2508                 error = 0;
2509         if (error)
2510                 dlm_put_lkb(lkb);
2511         return;
2512
2513  fail:
2514         setup_stub_lkb(ls, ms);
2515         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2516 }
2517
2518 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2519 {
2520         struct dlm_lkb *lkb;
2521         struct dlm_rsb *r;
2522         int error, reply = 1;
2523
2524         error = find_lkb(ls, ms->m_remid, &lkb);
2525         if (error)
2526                 goto fail;
2527
2528         r = lkb->lkb_resource;
2529
2530         hold_rsb(r);
2531         lock_rsb(r);
2532
2533         receive_flags(lkb, ms);
2534         error = receive_convert_args(ls, lkb, ms);
2535         if (error)
2536                 goto out;
2537         reply = !down_conversion(lkb);
2538
2539         error = do_convert(r, lkb);
2540  out:
2541         if (reply)
2542                 send_convert_reply(r, lkb, error);
2543
2544         unlock_rsb(r);
2545         put_rsb(r);
2546         dlm_put_lkb(lkb);
2547         return;
2548
2549  fail:
2550         setup_stub_lkb(ls, ms);
2551         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2552 }
2553
2554 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2555 {
2556         struct dlm_lkb *lkb;
2557         struct dlm_rsb *r;
2558         int error;
2559
2560         error = find_lkb(ls, ms->m_remid, &lkb);
2561         if (error)
2562                 goto fail;
2563
2564         r = lkb->lkb_resource;
2565
2566         hold_rsb(r);
2567         lock_rsb(r);
2568
2569         receive_flags(lkb, ms);
2570         error = receive_unlock_args(ls, lkb, ms);
2571         if (error)
2572                 goto out;
2573
2574         error = do_unlock(r, lkb);
2575  out:
2576         send_unlock_reply(r, lkb, error);
2577
2578         unlock_rsb(r);
2579         put_rsb(r);
2580         dlm_put_lkb(lkb);
2581         return;
2582
2583  fail:
2584         setup_stub_lkb(ls, ms);
2585         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2586 }
2587
2588 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2589 {
2590         struct dlm_lkb *lkb;
2591         struct dlm_rsb *r;
2592         int error;
2593
2594         error = find_lkb(ls, ms->m_remid, &lkb);
2595         if (error)
2596                 goto fail;
2597
2598         receive_flags(lkb, ms);
2599
2600         r = lkb->lkb_resource;
2601
2602         hold_rsb(r);
2603         lock_rsb(r);
2604
2605         error = do_cancel(r, lkb);
2606         send_cancel_reply(r, lkb, error);
2607
2608         unlock_rsb(r);
2609         put_rsb(r);
2610         dlm_put_lkb(lkb);
2611         return;
2612
2613  fail:
2614         setup_stub_lkb(ls, ms);
2615         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2616 }
2617
2618 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2619 {
2620         struct dlm_lkb *lkb;
2621         struct dlm_rsb *r;
2622         int error;
2623
2624         error = find_lkb(ls, ms->m_remid, &lkb);
2625         if (error) {
2626                 log_error(ls, "receive_grant no lkb");
2627                 return;
2628         }
2629         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2630
2631         r = lkb->lkb_resource;
2632
2633         hold_rsb(r);
2634         lock_rsb(r);
2635
2636         receive_flags_reply(lkb, ms);
2637         grant_lock_pc(r, lkb, ms);
2638         queue_cast(r, lkb, 0);
2639
2640         unlock_rsb(r);
2641         put_rsb(r);
2642         dlm_put_lkb(lkb);
2643 }
2644
2645 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2646 {
2647         struct dlm_lkb *lkb;
2648         struct dlm_rsb *r;
2649         int error;
2650
2651         error = find_lkb(ls, ms->m_remid, &lkb);
2652         if (error) {
2653                 log_error(ls, "receive_bast no lkb");
2654                 return;
2655         }
2656         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2657
2658         r = lkb->lkb_resource;
2659
2660         hold_rsb(r);
2661         lock_rsb(r);
2662
2663         queue_bast(r, lkb, ms->m_bastmode);
2664
2665         unlock_rsb(r);
2666         put_rsb(r);
2667         dlm_put_lkb(lkb);
2668 }
2669
2670 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
2671 {
2672         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
2673
2674         from_nodeid = ms->m_header.h_nodeid;
2675         our_nodeid = dlm_our_nodeid();
2676
2677         len = receive_extralen(ms);
2678
2679         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2680         if (dir_nodeid != our_nodeid) {
2681                 log_error(ls, "lookup dir_nodeid %d from %d",
2682                           dir_nodeid, from_nodeid);
2683                 error = -EINVAL;
2684                 ret_nodeid = -1;
2685                 goto out;
2686         }
2687
2688         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
2689
2690         /* Optimization: we're master so treat lookup as a request */
2691         if (!error && ret_nodeid == our_nodeid) {
2692                 receive_request(ls, ms);
2693                 return;
2694         }
2695  out:
2696         send_lookup_reply(ls, ms, ret_nodeid, error);
2697 }
2698
2699 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
2700 {
2701         int len, dir_nodeid, from_nodeid;
2702
2703         from_nodeid = ms->m_header.h_nodeid;
2704
2705         len = receive_extralen(ms);
2706
2707         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2708         if (dir_nodeid != dlm_our_nodeid()) {
2709                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
2710                           dir_nodeid, from_nodeid);
2711                 return;
2712         }
2713
2714         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
2715 }
2716
2717 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2718 {
2719         struct dlm_lkb *lkb;
2720         struct dlm_rsb *r;
2721         int error, mstype;
2722
2723         error = find_lkb(ls, ms->m_remid, &lkb);
2724         if (error) {
2725                 log_error(ls, "receive_request_reply no lkb");
2726                 return;
2727         }
2728         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2729
2730         mstype = lkb->lkb_wait_type;
2731         error = remove_from_waiters(lkb);
2732         if (error) {
2733                 log_error(ls, "receive_request_reply not on waiters");
2734                 goto out;
2735         }
2736
2737         /* this is the value returned from do_request() on the master */
2738         error = ms->m_result;
2739
2740         r = lkb->lkb_resource;
2741         hold_rsb(r);
2742         lock_rsb(r);
2743
2744         /* Optimization: the dir node was also the master, so it took our
2745            lookup as a request and sent request reply instead of lookup reply */
2746         if (mstype == DLM_MSG_LOOKUP) {
2747                 r->res_nodeid = ms->m_header.h_nodeid;
2748                 lkb->lkb_nodeid = r->res_nodeid;
2749         }
2750
2751         switch (error) {
2752         case -EAGAIN:
2753                 /* request would block (be queued) on remote master;
2754                    the unhold undoes the original ref from create_lkb()
2755                    so it leads to the lkb being freed */
2756                 queue_cast(r, lkb, -EAGAIN);
2757                 confirm_master(r, -EAGAIN);
2758                 unhold_lkb(lkb);
2759                 break;
2760
2761         case -EINPROGRESS:
2762         case 0:
2763                 /* request was queued or granted on remote master */
2764                 receive_flags_reply(lkb, ms);
2765                 lkb->lkb_remid = ms->m_lkid;
2766                 if (error)
2767                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
2768                 else {
2769                         grant_lock_pc(r, lkb, ms);
2770                         queue_cast(r, lkb, 0);
2771                 }
2772                 confirm_master(r, error);
2773                 break;
2774
2775         case -EBADR:
2776         case -ENOTBLK:
2777                 /* find_rsb failed to find rsb or rsb wasn't master */
2778                 r->res_nodeid = -1;
2779                 lkb->lkb_nodeid = -1;
2780                 _request_lock(r, lkb);
2781                 break;
2782
2783         default:
2784                 log_error(ls, "receive_request_reply error %d", error);
2785         }
2786
2787         unlock_rsb(r);
2788         put_rsb(r);
2789  out:
2790         dlm_put_lkb(lkb);
2791 }
2792
2793 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2794                                     struct dlm_message *ms)
2795 {
2796         int error = ms->m_result;
2797
2798         /* this is the value returned from do_convert() on the master */
2799
2800         switch (error) {
2801         case -EAGAIN:
2802                 /* convert would block (be queued) on remote master */
2803                 queue_cast(r, lkb, -EAGAIN);
2804                 break;
2805
2806         case -EINPROGRESS:
2807                 /* convert was queued on remote master */
2808                 del_lkb(r, lkb);
2809                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2810                 break;
2811
2812         case 0:
2813                 /* convert was granted on remote master */
2814                 receive_flags_reply(lkb, ms);
2815                 grant_lock_pc(r, lkb, ms);
2816                 queue_cast(r, lkb, 0);
2817                 break;
2818
2819         default:
2820                 log_error(r->res_ls, "receive_convert_reply error %d", error);
2821         }
2822 }
2823
2824 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2825 {
2826         struct dlm_rsb *r = lkb->lkb_resource;
2827
2828         hold_rsb(r);
2829         lock_rsb(r);
2830
2831         __receive_convert_reply(r, lkb, ms);
2832
2833         unlock_rsb(r);
2834         put_rsb(r);
2835 }
2836
2837 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
2838 {
2839         struct dlm_lkb *lkb;
2840         int error;
2841
2842         error = find_lkb(ls, ms->m_remid, &lkb);
2843         if (error) {
2844                 log_error(ls, "receive_convert_reply no lkb");
2845                 return;
2846         }
2847         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2848
2849         error = remove_from_waiters(lkb);
2850         if (error) {
2851                 log_error(ls, "receive_convert_reply not on waiters");
2852                 goto out;
2853         }
2854
2855         _receive_convert_reply(lkb, ms);
2856  out:
2857         dlm_put_lkb(lkb);
2858 }
2859
2860 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2861 {
2862         struct dlm_rsb *r = lkb->lkb_resource;
2863         int error = ms->m_result;
2864
2865         hold_rsb(r);
2866         lock_rsb(r);
2867
2868         /* this is the value returned from do_unlock() on the master */
2869
2870         switch (error) {
2871         case -DLM_EUNLOCK:
2872                 receive_flags_reply(lkb, ms);
2873                 remove_lock_pc(r, lkb);
2874                 queue_cast(r, lkb, -DLM_EUNLOCK);
2875                 break;
2876         default:
2877                 log_error(r->res_ls, "receive_unlock_reply error %d", error);
2878         }
2879
2880         unlock_rsb(r);
2881         put_rsb(r);
2882 }
2883
2884 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
2885 {
2886         struct dlm_lkb *lkb;
2887         int error;
2888
2889         error = find_lkb(ls, ms->m_remid, &lkb);
2890         if (error) {
2891                 log_error(ls, "receive_unlock_reply no lkb");
2892                 return;
2893         }
2894         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2895
2896         error = remove_from_waiters(lkb);
2897         if (error) {
2898                 log_error(ls, "receive_unlock_reply not on waiters");
2899                 goto out;
2900         }
2901
2902         _receive_unlock_reply(lkb, ms);
2903  out:
2904         dlm_put_lkb(lkb);
2905 }
2906
2907 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2908 {
2909         struct dlm_rsb *r = lkb->lkb_resource;
2910         int error = ms->m_result;
2911
2912         hold_rsb(r);
2913         lock_rsb(r);
2914
2915         /* this is the value returned from do_cancel() on the master */
2916
2917         switch (error) {
2918         case -DLM_ECANCEL:
2919                 receive_flags_reply(lkb, ms);
2920                 revert_lock_pc(r, lkb);
2921                 queue_cast(r, lkb, -DLM_ECANCEL);
2922                 break;
2923         default:
2924                 log_error(r->res_ls, "receive_cancel_reply error %d", error);
2925         }
2926
2927         unlock_rsb(r);
2928         put_rsb(r);
2929 }
2930
2931 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
2932 {
2933         struct dlm_lkb *lkb;
2934         int error;
2935
2936         error = find_lkb(ls, ms->m_remid, &lkb);
2937         if (error) {
2938                 log_error(ls, "receive_cancel_reply no lkb");
2939                 return;
2940         }
2941         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2942
2943         error = remove_from_waiters(lkb);
2944         if (error) {
2945                 log_error(ls, "receive_cancel_reply not on waiters");
2946                 goto out;
2947         }
2948
2949         _receive_cancel_reply(lkb, ms);
2950  out:
2951         dlm_put_lkb(lkb);
2952 }
2953
2954 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
2955 {
2956         struct dlm_lkb *lkb;
2957         struct dlm_rsb *r;
2958         int error, ret_nodeid;
2959
2960         error = find_lkb(ls, ms->m_lkid, &lkb);
2961         if (error) {
2962                 log_error(ls, "receive_lookup_reply no lkb");
2963                 return;
2964         }
2965
2966         error = remove_from_waiters(lkb);
2967         if (error) {
2968                 log_error(ls, "receive_lookup_reply not on waiters");
2969                 goto out;
2970         }
2971
2972         /* this is the value returned by dlm_dir_lookup on dir node
2973            FIXME: will a non-zero error ever be returned? */
2974         error = ms->m_result;
2975
2976         r = lkb->lkb_resource;
2977         hold_rsb(r);
2978         lock_rsb(r);
2979
2980         ret_nodeid = ms->m_nodeid;
2981         if (ret_nodeid == dlm_our_nodeid()) {
2982                 r->res_nodeid = 0;
2983                 ret_nodeid = 0;
2984                 r->res_first_lkid = 0;
2985         } else {
2986                 /* set_master() will copy res_nodeid to lkb_nodeid */
2987                 r->res_nodeid = ret_nodeid;
2988         }
2989
2990         _request_lock(r, lkb);
2991
2992         if (!ret_nodeid)
2993                 process_lookup_list(r);
2994
2995         unlock_rsb(r);
2996         put_rsb(r);
2997  out:
2998         dlm_put_lkb(lkb);
2999 }
3000
3001 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3002 {
3003         struct dlm_message *ms = (struct dlm_message *) hd;
3004         struct dlm_ls *ls;
3005         int error;
3006
3007         if (!recovery)
3008                 dlm_message_in(ms);
3009
3010         ls = dlm_find_lockspace_global(hd->h_lockspace);
3011         if (!ls) {
3012                 log_print("drop message %d from %d for unknown lockspace %d",
3013                           ms->m_type, nodeid, hd->h_lockspace);
3014                 return -EINVAL;
3015         }
3016
3017         /* recovery may have just ended leaving a bunch of backed-up requests
3018            in the requestqueue; wait while dlm_recoverd clears them */
3019
3020         if (!recovery)
3021                 dlm_wait_requestqueue(ls);
3022
3023         /* recovery may have just started while there were a bunch of
3024            in-flight requests -- save them in requestqueue to be processed
3025            after recovery.  we can't let dlm_recvd block on the recovery
3026            lock.  if dlm_recoverd is calling this function to clear the
3027            requestqueue, it needs to be interrupted (-EINTR) if another
3028            recovery operation is starting. */
3029
3030         while (1) {
3031                 if (dlm_locking_stopped(ls)) {
3032                         if (!recovery)
3033                                 dlm_add_requestqueue(ls, nodeid, hd);
3034                         error = -EINTR;
3035                         goto out;
3036                 }
3037
3038                 if (lock_recovery_try(ls))
3039                         break;
3040                 schedule();
3041         }
3042
3043         switch (ms->m_type) {
3044
3045         /* messages sent to a master node */
3046
3047         case DLM_MSG_REQUEST:
3048                 receive_request(ls, ms);
3049                 break;
3050
3051         case DLM_MSG_CONVERT:
3052                 receive_convert(ls, ms);
3053                 break;
3054
3055         case DLM_MSG_UNLOCK:
3056                 receive_unlock(ls, ms);
3057                 break;
3058
3059         case DLM_MSG_CANCEL:
3060                 receive_cancel(ls, ms);
3061                 break;
3062
3063         /* messages sent from a master node (replies to above) */
3064
3065         case DLM_MSG_REQUEST_REPLY:
3066                 receive_request_reply(ls, ms);
3067                 break;
3068
3069         case DLM_MSG_CONVERT_REPLY:
3070                 receive_convert_reply(ls, ms);
3071                 break;
3072
3073         case DLM_MSG_UNLOCK_REPLY:
3074                 receive_unlock_reply(ls, ms);
3075                 break;
3076
3077         case DLM_MSG_CANCEL_REPLY:
3078                 receive_cancel_reply(ls, ms);
3079                 break;
3080
3081         /* messages sent from a master node (only two types of async msg) */
3082
3083         case DLM_MSG_GRANT:
3084                 receive_grant(ls, ms);
3085                 break;
3086
3087         case DLM_MSG_BAST:
3088                 receive_bast(ls, ms);
3089                 break;
3090
3091         /* messages sent to a dir node */
3092
3093         case DLM_MSG_LOOKUP:
3094                 receive_lookup(ls, ms);
3095                 break;
3096
3097         case DLM_MSG_REMOVE:
3098                 receive_remove(ls, ms);
3099                 break;
3100
3101         /* messages sent from a dir node (remove has no reply) */
3102
3103         case DLM_MSG_LOOKUP_REPLY:
3104                 receive_lookup_reply(ls, ms);
3105                 break;
3106
3107         default:
3108                 log_error(ls, "unknown message type %d", ms->m_type);
3109         }
3110
3111         unlock_recovery(ls);
3112  out:
3113         dlm_put_lockspace(ls);
3114         dlm_astd_wake();
3115         return 0;
3116 }
3117
3118
3119 /*
3120  * Recovery related
3121  */
3122
3123 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3124 {
3125         if (middle_conversion(lkb)) {
3126                 hold_lkb(lkb);
3127                 ls->ls_stub_ms.m_result = -EINPROGRESS;
3128                 _remove_from_waiters(lkb);
3129                 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3130
3131                 /* Same special case as in receive_rcom_lock_args() */
3132                 lkb->lkb_grmode = DLM_LOCK_IV;
3133                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3134                 unhold_lkb(lkb);
3135
3136         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3137                 lkb->lkb_flags |= DLM_IFL_RESEND;
3138         }
3139
3140         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3141            conversions are async; there's no reply from the remote master */
3142 }
3143
3144 /* A waiting lkb needs recovery if the master node has failed, or
3145    the master node is changing (only when no directory is used) */
3146
3147 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3148 {
3149         if (dlm_is_removed(ls, lkb->lkb_nodeid))
3150                 return 1;
3151
3152         if (!dlm_no_directory(ls))
3153                 return 0;
3154
3155         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3156                 return 1;
3157
3158         return 0;
3159 }
3160
3161 /* Recovery for locks that are waiting for replies from nodes that are now
3162    gone.  We can just complete unlocks and cancels by faking a reply from the
3163    dead node.  Requests and up-conversions we flag to be resent after
3164    recovery.  Down-conversions can just be completed with a fake reply like
3165    unlocks.  Conversions between PR and CW need special attention. */
3166
3167 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3168 {
3169         struct dlm_lkb *lkb, *safe;
3170
3171         mutex_lock(&ls->ls_waiters_mutex);
3172
3173         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3174                 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3175                           lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3176
3177                 /* all outstanding lookups, regardless of destination  will be
3178                    resent after recovery is done */
3179
3180                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3181                         lkb->lkb_flags |= DLM_IFL_RESEND;
3182                         continue;
3183                 }
3184
3185                 if (!waiter_needs_recovery(ls, lkb))
3186                         continue;
3187
3188                 switch (lkb->lkb_wait_type) {
3189
3190                 case DLM_MSG_REQUEST:
3191                         lkb->lkb_flags |= DLM_IFL_RESEND;
3192                         break;
3193
3194                 case DLM_MSG_CONVERT:
3195                         recover_convert_waiter(ls, lkb);
3196                         break;
3197
3198                 case DLM_MSG_UNLOCK:
3199                         hold_lkb(lkb);
3200                         ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3201                         _remove_from_waiters(lkb);
3202                         _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3203                         dlm_put_lkb(lkb);
3204                         break;
3205
3206                 case DLM_MSG_CANCEL:
3207                         hold_lkb(lkb);
3208                         ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3209                         _remove_from_waiters(lkb);
3210                         _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3211                         dlm_put_lkb(lkb);
3212                         break;
3213
3214                 default:
3215                         log_error(ls, "invalid lkb wait_type %d",
3216                                   lkb->lkb_wait_type);
3217                 }
3218                 schedule();
3219         }
3220         mutex_unlock(&ls->ls_waiters_mutex);
3221 }
3222
3223 static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
3224 {
3225         struct dlm_lkb *lkb;
3226         int rv = 0;
3227
3228         mutex_lock(&ls->ls_waiters_mutex);
3229         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3230                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3231                         rv = lkb->lkb_wait_type;
3232                         _remove_from_waiters(lkb);
3233                         lkb->lkb_flags &= ~DLM_IFL_RESEND;
3234                         break;
3235                 }
3236         }
3237         mutex_unlock(&ls->ls_waiters_mutex);
3238
3239         if (!rv)
3240                 lkb = NULL;
3241         *lkb_ret = lkb;
3242         return rv;
3243 }
3244
3245 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
3246    master or dir-node for r.  Processing the lkb may result in it being placed
3247    back on waiters. */
3248
3249 int dlm_recover_waiters_post(struct dlm_ls *ls)
3250 {
3251         struct dlm_lkb *lkb;
3252         struct dlm_rsb *r;
3253         int error = 0, mstype;
3254
3255         while (1) {
3256                 if (dlm_locking_stopped(ls)) {
3257                         log_debug(ls, "recover_waiters_post aborted");
3258                         error = -EINTR;
3259                         break;
3260                 }
3261
3262                 mstype = remove_resend_waiter(ls, &lkb);
3263                 if (!mstype)
3264                         break;
3265
3266                 r = lkb->lkb_resource;
3267
3268                 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3269                           lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3270
3271                 switch (mstype) {
3272
3273                 case DLM_MSG_LOOKUP:
3274                         hold_rsb(r);
3275                         lock_rsb(r);
3276                         _request_lock(r, lkb);
3277                         if (is_master(r))
3278                                 confirm_master(r, 0);
3279                         unlock_rsb(r);
3280                         put_rsb(r);
3281                         break;
3282
3283                 case DLM_MSG_REQUEST:
3284                         hold_rsb(r);
3285                         lock_rsb(r);
3286                         _request_lock(r, lkb);
3287                         unlock_rsb(r);
3288                         put_rsb(r);
3289                         break;
3290
3291                 case DLM_MSG_CONVERT:
3292                         hold_rsb(r);
3293                         lock_rsb(r);
3294                         _convert_lock(r, lkb);
3295                         unlock_rsb(r);
3296                         put_rsb(r);
3297                         break;
3298
3299                 default:
3300                         log_error(ls, "recover_waiters_post type %d", mstype);
3301                 }
3302         }
3303
3304         return error;
3305 }
3306
3307 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3308                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3309 {
3310         struct dlm_ls *ls = r->res_ls;
3311         struct dlm_lkb *lkb, *safe;
3312
3313         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3314                 if (test(ls, lkb)) {
3315                         rsb_set_flag(r, RSB_LOCKS_PURGED);
3316                         del_lkb(r, lkb);
3317                         /* this put should free the lkb */
3318                         if (!dlm_put_lkb(lkb))
3319                                 log_error(ls, "purged lkb not released");
3320                 }
3321         }
3322 }
3323
3324 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3325 {
3326         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3327 }
3328
3329 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3330 {
3331         return is_master_copy(lkb);
3332 }
3333
3334 static void purge_dead_locks(struct dlm_rsb *r)
3335 {
3336         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3337         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3338         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3339 }
3340
3341 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3342 {
3343         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3344         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3345         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3346 }
3347
3348 /* Get rid of locks held by nodes that are gone. */
3349
3350 int dlm_purge_locks(struct dlm_ls *ls)
3351 {
3352         struct dlm_rsb *r;
3353
3354         log_debug(ls, "dlm_purge_locks");
3355
3356         down_write(&ls->ls_root_sem);
3357         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3358                 hold_rsb(r);
3359                 lock_rsb(r);
3360                 if (is_master(r))
3361                         purge_dead_locks(r);
3362                 unlock_rsb(r);
3363                 unhold_rsb(r);
3364
3365                 schedule();
3366         }
3367         up_write(&ls->ls_root_sem);
3368
3369         return 0;
3370 }
3371
3372 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3373 {
3374         struct dlm_rsb *r, *r_ret = NULL;
3375
3376         read_lock(&ls->ls_rsbtbl[bucket].lock);
3377         list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3378                 if (!rsb_flag(r, RSB_LOCKS_PURGED))
3379                         continue;
3380                 hold_rsb(r);
3381                 rsb_clear_flag(r, RSB_LOCKS_PURGED);
3382                 r_ret = r;
3383                 break;
3384         }
3385         read_unlock(&ls->ls_rsbtbl[bucket].lock);
3386         return r_ret;
3387 }
3388
3389 void dlm_grant_after_purge(struct dlm_ls *ls)
3390 {
3391         struct dlm_rsb *r;
3392         int bucket = 0;
3393
3394         while (1) {
3395                 r = find_purged_rsb(ls, bucket);
3396                 if (!r) {
3397                         if (bucket == ls->ls_rsbtbl_size - 1)
3398                                 break;
3399                         bucket++;
3400                         continue;
3401                 }
3402                 lock_rsb(r);
3403                 if (is_master(r)) {
3404                         grant_pending_locks(r);
3405                         confirm_master(r, 0);
3406                 }
3407                 unlock_rsb(r);
3408                 put_rsb(r);
3409                 schedule();
3410         }
3411 }
3412
3413 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3414                                          uint32_t remid)
3415 {
3416         struct dlm_lkb *lkb;
3417
3418         list_for_each_entry(lkb, head, lkb_statequeue) {
3419                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3420                         return lkb;
3421         }
3422         return NULL;
3423 }
3424
3425 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3426                                     uint32_t remid)
3427 {
3428         struct dlm_lkb *lkb;
3429
3430         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3431         if (lkb)
3432                 return lkb;
3433         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3434         if (lkb)
3435                 return lkb;
3436         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3437         if (lkb)
3438                 return lkb;
3439         return NULL;
3440 }
3441
3442 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3443                                   struct dlm_rsb *r, struct dlm_rcom *rc)
3444 {
3445         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3446         int lvblen;
3447
3448         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3449         lkb->lkb_ownpid = rl->rl_ownpid;
3450         lkb->lkb_remid = rl->rl_lkid;
3451         lkb->lkb_exflags = rl->rl_exflags;
3452         lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3453         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3454         lkb->lkb_lvbseq = rl->rl_lvbseq;
3455         lkb->lkb_rqmode = rl->rl_rqmode;
3456         lkb->lkb_grmode = rl->rl_grmode;
3457         /* don't set lkb_status because add_lkb wants to itself */
3458
3459         lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3460         lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3461
3462         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3463                 lkb->lkb_lvbptr = allocate_lvb(ls);
3464                 if (!lkb->lkb_lvbptr)
3465                         return -ENOMEM;
3466                 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3467                          sizeof(struct rcom_lock);
3468                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3469         }
3470
3471         /* Conversions between PR and CW (middle modes) need special handling.
3472            The real granted mode of these converting locks cannot be determined
3473            until all locks have been rebuilt on the rsb (recover_conversion) */
3474
3475         if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3476                 rl->rl_status = DLM_LKSTS_CONVERT;
3477                 lkb->lkb_grmode = DLM_LOCK_IV;
3478                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3479         }
3480
3481         return 0;
3482 }
3483
3484 /* This lkb may have been recovered in a previous aborted recovery so we need
3485    to check if the rsb already has an lkb with the given remote nodeid/lkid.
3486    If so we just send back a standard reply.  If not, we create a new lkb with
3487    the given values and send back our lkid.  We send back our lkid by sending
3488    back the rcom_lock struct we got but with the remid field filled in. */
3489
3490 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3491 {
3492         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3493         struct dlm_rsb *r;
3494         struct dlm_lkb *lkb;
3495         int error;
3496
3497         if (rl->rl_parent_lkid) {
3498                 error = -EOPNOTSUPP;
3499                 goto out;
3500         }
3501
3502         error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3503         if (error)
3504                 goto out;
3505
3506         lock_rsb(r);
3507
3508         lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3509         if (lkb) {
3510                 error = -EEXIST;
3511                 goto out_remid;
3512         }
3513
3514         error = create_lkb(ls, &lkb);
3515         if (error)
3516                 goto out_unlock;
3517
3518         error = receive_rcom_lock_args(ls, lkb, r, rc);
3519         if (error) {
3520                 __put_lkb(ls, lkb);
3521                 goto out_unlock;
3522         }
3523
3524         attach_lkb(r, lkb);
3525         add_lkb(r, lkb, rl->rl_status);
3526         error = 0;
3527
3528  out_remid:
3529         /* this is the new value returned to the lock holder for
3530            saving in its process-copy lkb */
3531         rl->rl_remid = lkb->lkb_id;
3532
3533  out_unlock:
3534         unlock_rsb(r);
3535         put_rsb(r);
3536  out:
3537         if (error)
3538                 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3539         rl->rl_result = error;
3540         return error;
3541 }
3542
3543 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3544 {
3545         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3546         struct dlm_rsb *r;
3547         struct dlm_lkb *lkb;
3548         int error;
3549
3550         error = find_lkb(ls, rl->rl_lkid, &lkb);
3551         if (error) {
3552                 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3553                 return error;
3554         }
3555
3556         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3557
3558         error = rl->rl_result;
3559
3560         r = lkb->lkb_resource;
3561         hold_rsb(r);
3562         lock_rsb(r);
3563
3564         switch (error) {
3565         case -EEXIST:
3566                 log_debug(ls, "master copy exists %x", lkb->lkb_id);
3567                 /* fall through */
3568         case 0:
3569                 lkb->lkb_remid = rl->rl_remid;
3570                 break;
3571         default:
3572                 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
3573                           error, lkb->lkb_id);
3574         }
3575
3576         /* an ack for dlm_recover_locks() which waits for replies from
3577            all the locks it sends to new masters */
3578         dlm_recovered_lock(r);
3579
3580         unlock_rsb(r);
3581         put_rsb(r);
3582         dlm_put_lkb(lkb);
3583
3584         return 0;
3585 }
3586
3587 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
3588                      int mode, uint32_t flags, void *name, unsigned int namelen,
3589                      uint32_t parent_lkid)
3590 {
3591         struct dlm_lkb *lkb;
3592         struct dlm_args args;
3593         int error;
3594
3595         lock_recovery(ls);
3596
3597         error = create_lkb(ls, &lkb);
3598         if (error) {
3599                 kfree(ua);
3600                 goto out;
3601         }
3602
3603         if (flags & DLM_LKF_VALBLK) {
3604                 ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3605                 if (!ua->lksb.sb_lvbptr) {
3606                         kfree(ua);
3607                         __put_lkb(ls, lkb);
3608                         error = -ENOMEM;
3609                         goto out;
3610                 }
3611         }
3612
3613         /* After ua is attached to lkb it will be freed by free_lkb().
3614            When DLM_IFL_USER is set, the dlm knows that this is a userspace
3615            lock and that lkb_astparam is the dlm_user_args structure. */
3616
3617         error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
3618                               FAKE_USER_AST, ua, FAKE_USER_AST, &args);
3619         lkb->lkb_flags |= DLM_IFL_USER;
3620         ua->old_mode = DLM_LOCK_IV;
3621
3622         if (error) {
3623                 __put_lkb(ls, lkb);
3624                 goto out;
3625         }
3626
3627         error = request_lock(ls, lkb, name, namelen, &args);
3628
3629         switch (error) {
3630         case 0:
3631                 break;
3632         case -EINPROGRESS:
3633                 error = 0;
3634                 break;
3635         case -EAGAIN:
3636                 error = 0;
3637                 /* fall through */
3638         default:
3639                 __put_lkb(ls, lkb);
3640                 goto out;
3641         }
3642
3643         /* add this new lkb to the per-process list of locks */
3644         spin_lock(&ua->proc->locks_spin);
3645         kref_get(&lkb->lkb_ref);
3646         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
3647         spin_unlock(&ua->proc->locks_spin);
3648  out:
3649         unlock_recovery(ls);
3650         return error;
3651 }
3652
3653 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3654                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
3655 {
3656         struct dlm_lkb *lkb;
3657         struct dlm_args args;
3658         struct dlm_user_args *ua;
3659         int error;
3660
3661         lock_recovery(ls);
3662
3663         error = find_lkb(ls, lkid, &lkb);
3664         if (error)
3665                 goto out;
3666
3667         /* user can change the params on its lock when it converts it, or
3668            add an lvb that didn't exist before */
3669
3670         ua = (struct dlm_user_args *)lkb->lkb_astparam;
3671
3672         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
3673                 ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3674                 if (!ua->lksb.sb_lvbptr) {
3675                         error = -ENOMEM;
3676                         goto out_put;
3677                 }
3678         }
3679         if (lvb_in && ua->lksb.sb_lvbptr)
3680                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3681
3682         ua->castparam = ua_tmp->castparam;
3683         ua->castaddr = ua_tmp->castaddr;
3684         ua->bastparam = ua_tmp->bastparam;
3685         ua->bastaddr = ua_tmp->bastaddr;
3686         ua->user_lksb = ua_tmp->user_lksb;
3687         ua->old_mode = lkb->lkb_grmode;
3688
3689         error = set_lock_args(mode, &ua->lksb, flags, 0, 0, FAKE_USER_AST, ua,
3690                               FAKE_USER_AST, &args);
3691         if (error)
3692                 goto out_put;
3693
3694         error = convert_lock(ls, lkb, &args);
3695
3696         if (error == -EINPROGRESS || error == -EAGAIN)
3697                 error = 0;
3698  out_put:
3699         dlm_put_lkb(lkb);
3700  out:
3701         unlock_recovery(ls);
3702         kfree(ua_tmp);
3703         return error;
3704 }
3705
3706 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3707                     uint32_t flags, uint32_t lkid, char *lvb_in)
3708 {
3709         struct dlm_lkb *lkb;
3710         struct dlm_args args;
3711         struct dlm_user_args *ua;
3712         int error;
3713
3714         lock_recovery(ls);
3715
3716         error = find_lkb(ls, lkid, &lkb);
3717         if (error)
3718                 goto out;
3719
3720         ua = (struct dlm_user_args *)lkb->lkb_astparam;
3721
3722         if (lvb_in && ua->lksb.sb_lvbptr)
3723                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3724         ua->castparam = ua_tmp->castparam;
3725         ua->user_lksb = ua_tmp->user_lksb;
3726
3727         error = set_unlock_args(flags, ua, &args);
3728         if (error)
3729                 goto out_put;
3730
3731         error = unlock_lock(ls, lkb, &args);
3732
3733         if (error == -DLM_EUNLOCK)
3734                 error = 0;
3735         if (error)
3736                 goto out_put;
3737
3738         spin_lock(&ua->proc->locks_spin);
3739         list_del_init(&lkb->lkb_ownqueue);
3740         spin_unlock(&ua->proc->locks_spin);
3741
3742         /* this removes the reference for the proc->locks list added by
3743            dlm_user_request */
3744         unhold_lkb(lkb);
3745  out_put:
3746         dlm_put_lkb(lkb);
3747  out:
3748         unlock_recovery(ls);
3749         return error;
3750 }
3751
3752 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3753                     uint32_t flags, uint32_t lkid)
3754 {
3755         struct dlm_lkb *lkb;
3756         struct dlm_args args;
3757         struct dlm_user_args *ua;
3758         int error;
3759
3760         lock_recovery(ls);
3761
3762         error = find_lkb(ls, lkid, &lkb);
3763         if (error)
3764                 goto out;
3765
3766         ua = (struct dlm_user_args *)lkb->lkb_astparam;
3767         ua->castparam = ua_tmp->castparam;
3768         ua->user_lksb = ua_tmp->user_lksb;
3769
3770         error = set_unlock_args(flags, ua, &args);
3771         if (error)
3772                 goto out_put;
3773
3774         error = cancel_lock(ls, lkb, &args);
3775
3776         if (error == -DLM_ECANCEL)
3777                 error = 0;
3778         if (error)
3779                 goto out_put;
3780
3781         /* this lkb was removed from the WAITING queue */
3782         if (lkb->lkb_grmode == DLM_LOCK_IV) {
3783                 spin_lock(&ua->proc->locks_spin);
3784                 list_del_init(&lkb->lkb_ownqueue);
3785                 spin_unlock(&ua->proc->locks_spin);
3786                 unhold_lkb(lkb);
3787         }
3788  out_put:
3789         dlm_put_lkb(lkb);
3790  out:
3791         unlock_recovery(ls);
3792         return error;
3793 }
3794
3795 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3796 {
3797         struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3798
3799         if (ua->lksb.sb_lvbptr)
3800                 kfree(ua->lksb.sb_lvbptr);
3801         kfree(ua);
3802         lkb->lkb_astparam = (long)NULL;
3803
3804         /* TODO: propogate to master if needed */
3805         return 0;
3806 }
3807
3808 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
3809    Regardless of what rsb queue the lock is on, it's removed and freed. */
3810
3811 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3812 {
3813         struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3814         struct dlm_args args;
3815         int error;
3816
3817         /* FIXME: we need to handle the case where the lkb is in limbo
3818            while the rsb is being looked up, currently we assert in
3819            _unlock_lock/is_remote because rsb nodeid is -1. */
3820
3821         set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
3822
3823         error = unlock_lock(ls, lkb, &args);
3824         if (error == -DLM_EUNLOCK)
3825                 error = 0;
3826         return error;
3827 }
3828
3829 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
3830    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
3831    which we clear here. */
3832
3833 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
3834    list, and no more device_writes should add lkb's to proc->locks list; so we
3835    shouldn't need to take asts_spin or locks_spin here.  this assumes that
3836    device reads/writes/closes are serialized -- FIXME: we may need to serialize
3837    them ourself. */
3838
3839 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
3840 {
3841         struct dlm_lkb *lkb, *safe;
3842
3843         lock_recovery(ls);
3844         mutex_lock(&ls->ls_clear_proc_locks);
3845
3846         list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) {
3847                 if (lkb->lkb_ast_type) {
3848                         list_del(&lkb->lkb_astqueue);
3849                         unhold_lkb(lkb);
3850                 }
3851
3852                 list_del_init(&lkb->lkb_ownqueue);
3853
3854                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) {
3855                         lkb->lkb_flags |= DLM_IFL_ORPHAN;
3856                         orphan_proc_lock(ls, lkb);
3857                 } else {
3858                         lkb->lkb_flags |= DLM_IFL_DEAD;
3859                         unlock_proc_lock(ls, lkb);
3860                 }
3861
3862                 /* this removes the reference for the proc->locks list
3863                    added by dlm_user_request, it may result in the lkb
3864                    being freed */
3865
3866                 dlm_put_lkb(lkb);
3867         }
3868         mutex_unlock(&ls->ls_clear_proc_locks);
3869         unlock_recovery(ls);
3870 }