]> err.no Git - linux-2.6/blob - fs/dlm/lock.c
[DLM] Remove range locks from the DLM
[linux-2.6] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58
59 #include "dlm_internal.h"
60 #include "memory.h"
61 #include "lowcomms.h"
62 #include "requestqueue.h"
63 #include "util.h"
64 #include "dir.h"
65 #include "member.h"
66 #include "lockspace.h"
67 #include "ast.h"
68 #include "lock.h"
69 #include "rcom.h"
70 #include "recover.h"
71 #include "lvb_table.h"
72 #include "config.h"
73
74 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
75 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
76 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
80 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_remove(struct dlm_rsb *r);
82 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
84                                     struct dlm_message *ms);
85 static int receive_extralen(struct dlm_message *ms);
86
87 /*
88  * Lock compatibilty matrix - thanks Steve
89  * UN = Unlocked state. Not really a state, used as a flag
90  * PD = Padding. Used to make the matrix a nice power of two in size
91  * Other states are the same as the VMS DLM.
92  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
93  */
94
95 static const int __dlm_compat_matrix[8][8] = {
96       /* UN NL CR CW PR PW EX PD */
97         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
98         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
99         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
100         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
101         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
102         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
103         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
104         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
105 };
106
107 /*
108  * This defines the direction of transfer of LVB data.
109  * Granted mode is the row; requested mode is the column.
110  * Usage: matrix[grmode+1][rqmode+1]
111  * 1 = LVB is returned to the caller
112  * 0 = LVB is written to the resource
113  * -1 = nothing happens to the LVB
114  */
115
116 const int dlm_lvb_operations[8][8] = {
117         /* UN   NL  CR  CW  PR  PW  EX  PD*/
118         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
119         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
120         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
121         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
122         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
123         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
124         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
125         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
126 };
127 EXPORT_SYMBOL_GPL(dlm_lvb_operations);
128
129 #define modes_compat(gr, rq) \
130         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
131
132 int dlm_modes_compat(int mode1, int mode2)
133 {
134         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
135 }
136
137 /*
138  * Compatibility matrix for conversions with QUECVT set.
139  * Granted mode is the row; requested mode is the column.
140  * Usage: matrix[grmode+1][rqmode+1]
141  */
142
143 static const int __quecvt_compat_matrix[8][8] = {
144       /* UN NL CR CW PR PW EX PD */
145         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
146         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
147         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
148         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
149         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
150         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
151         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
152         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
153 };
154
155 static void dlm_print_lkb(struct dlm_lkb *lkb)
156 {
157         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
158                "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
159                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
160                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
161                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
162 }
163
164 void dlm_print_rsb(struct dlm_rsb *r)
165 {
166         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
167                r->res_nodeid, r->res_flags, r->res_first_lkid,
168                r->res_recover_locks_count, r->res_name);
169 }
170
171 /* Threads cannot use the lockspace while it's being recovered */
172
173 static inline void lock_recovery(struct dlm_ls *ls)
174 {
175         down_read(&ls->ls_in_recovery);
176 }
177
178 static inline void unlock_recovery(struct dlm_ls *ls)
179 {
180         up_read(&ls->ls_in_recovery);
181 }
182
183 static inline int lock_recovery_try(struct dlm_ls *ls)
184 {
185         return down_read_trylock(&ls->ls_in_recovery);
186 }
187
188 static inline int can_be_queued(struct dlm_lkb *lkb)
189 {
190         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
191 }
192
193 static inline int force_blocking_asts(struct dlm_lkb *lkb)
194 {
195         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
196 }
197
198 static inline int is_demoted(struct dlm_lkb *lkb)
199 {
200         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
201 }
202
203 static inline int is_remote(struct dlm_rsb *r)
204 {
205         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
206         return !!r->res_nodeid;
207 }
208
209 static inline int is_process_copy(struct dlm_lkb *lkb)
210 {
211         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
212 }
213
214 static inline int is_master_copy(struct dlm_lkb *lkb)
215 {
216         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
217                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
218         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
219 }
220
221 static inline int middle_conversion(struct dlm_lkb *lkb)
222 {
223         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
224             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
225                 return 1;
226         return 0;
227 }
228
229 static inline int down_conversion(struct dlm_lkb *lkb)
230 {
231         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
232 }
233
234 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
235 {
236         if (is_master_copy(lkb))
237                 return;
238
239         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
240
241         lkb->lkb_lksb->sb_status = rv;
242         lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
243
244         dlm_add_ast(lkb, AST_COMP);
245 }
246
247 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
248 {
249         if (is_master_copy(lkb))
250                 send_bast(r, lkb, rqmode);
251         else {
252                 lkb->lkb_bastmode = rqmode;
253                 dlm_add_ast(lkb, AST_BAST);
254         }
255 }
256
257 /*
258  * Basic operations on rsb's and lkb's
259  */
260
261 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
262 {
263         struct dlm_rsb *r;
264
265         r = allocate_rsb(ls, len);
266         if (!r)
267                 return NULL;
268
269         r->res_ls = ls;
270         r->res_length = len;
271         memcpy(r->res_name, name, len);
272         mutex_init(&r->res_mutex);
273
274         INIT_LIST_HEAD(&r->res_lookup);
275         INIT_LIST_HEAD(&r->res_grantqueue);
276         INIT_LIST_HEAD(&r->res_convertqueue);
277         INIT_LIST_HEAD(&r->res_waitqueue);
278         INIT_LIST_HEAD(&r->res_root_list);
279         INIT_LIST_HEAD(&r->res_recover_list);
280
281         return r;
282 }
283
284 static int search_rsb_list(struct list_head *head, char *name, int len,
285                            unsigned int flags, struct dlm_rsb **r_ret)
286 {
287         struct dlm_rsb *r;
288         int error = 0;
289
290         list_for_each_entry(r, head, res_hashchain) {
291                 if (len == r->res_length && !memcmp(name, r->res_name, len))
292                         goto found;
293         }
294         return -ENOENT;
295
296  found:
297         if (r->res_nodeid && (flags & R_MASTER))
298                 error = -ENOTBLK;
299         *r_ret = r;
300         return error;
301 }
302
303 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
304                        unsigned int flags, struct dlm_rsb **r_ret)
305 {
306         struct dlm_rsb *r;
307         int error;
308
309         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
310         if (!error) {
311                 kref_get(&r->res_ref);
312                 goto out;
313         }
314         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
315         if (error)
316                 goto out;
317
318         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
319
320         if (dlm_no_directory(ls))
321                 goto out;
322
323         if (r->res_nodeid == -1) {
324                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
325                 r->res_first_lkid = 0;
326         } else if (r->res_nodeid > 0) {
327                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
328                 r->res_first_lkid = 0;
329         } else {
330                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
331                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
332         }
333  out:
334         *r_ret = r;
335         return error;
336 }
337
338 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
339                       unsigned int flags, struct dlm_rsb **r_ret)
340 {
341         int error;
342         write_lock(&ls->ls_rsbtbl[b].lock);
343         error = _search_rsb(ls, name, len, b, flags, r_ret);
344         write_unlock(&ls->ls_rsbtbl[b].lock);
345         return error;
346 }
347
348 /*
349  * Find rsb in rsbtbl and potentially create/add one
350  *
351  * Delaying the release of rsb's has a similar benefit to applications keeping
352  * NL locks on an rsb, but without the guarantee that the cached master value
353  * will still be valid when the rsb is reused.  Apps aren't always smart enough
354  * to keep NL locks on an rsb that they may lock again shortly; this can lead
355  * to excessive master lookups and removals if we don't delay the release.
356  *
357  * Searching for an rsb means looking through both the normal list and toss
358  * list.  When found on the toss list the rsb is moved to the normal list with
359  * ref count of 1; when found on normal list the ref count is incremented.
360  */
361
362 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
363                     unsigned int flags, struct dlm_rsb **r_ret)
364 {
365         struct dlm_rsb *r, *tmp;
366         uint32_t hash, bucket;
367         int error = 0;
368
369         if (dlm_no_directory(ls))
370                 flags |= R_CREATE;
371
372         hash = jhash(name, namelen, 0);
373         bucket = hash & (ls->ls_rsbtbl_size - 1);
374
375         error = search_rsb(ls, name, namelen, bucket, flags, &r);
376         if (!error)
377                 goto out;
378
379         if (error == -ENOENT && !(flags & R_CREATE))
380                 goto out;
381
382         /* the rsb was found but wasn't a master copy */
383         if (error == -ENOTBLK)
384                 goto out;
385
386         error = -ENOMEM;
387         r = create_rsb(ls, name, namelen);
388         if (!r)
389                 goto out;
390
391         r->res_hash = hash;
392         r->res_bucket = bucket;
393         r->res_nodeid = -1;
394         kref_init(&r->res_ref);
395
396         /* With no directory, the master can be set immediately */
397         if (dlm_no_directory(ls)) {
398                 int nodeid = dlm_dir_nodeid(r);
399                 if (nodeid == dlm_our_nodeid())
400                         nodeid = 0;
401                 r->res_nodeid = nodeid;
402         }
403
404         write_lock(&ls->ls_rsbtbl[bucket].lock);
405         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
406         if (!error) {
407                 write_unlock(&ls->ls_rsbtbl[bucket].lock);
408                 free_rsb(r);
409                 r = tmp;
410                 goto out;
411         }
412         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
413         write_unlock(&ls->ls_rsbtbl[bucket].lock);
414         error = 0;
415  out:
416         *r_ret = r;
417         return error;
418 }
419
420 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
421                  unsigned int flags, struct dlm_rsb **r_ret)
422 {
423         return find_rsb(ls, name, namelen, flags, r_ret);
424 }
425
426 /* This is only called to add a reference when the code already holds
427    a valid reference to the rsb, so there's no need for locking. */
428
429 static inline void hold_rsb(struct dlm_rsb *r)
430 {
431         kref_get(&r->res_ref);
432 }
433
434 void dlm_hold_rsb(struct dlm_rsb *r)
435 {
436         hold_rsb(r);
437 }
438
439 static void toss_rsb(struct kref *kref)
440 {
441         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
442         struct dlm_ls *ls = r->res_ls;
443
444         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
445         kref_init(&r->res_ref);
446         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
447         r->res_toss_time = jiffies;
448         if (r->res_lvbptr) {
449                 free_lvb(r->res_lvbptr);
450                 r->res_lvbptr = NULL;
451         }
452 }
453
454 /* When all references to the rsb are gone it's transfered to
455    the tossed list for later disposal. */
456
457 static void put_rsb(struct dlm_rsb *r)
458 {
459         struct dlm_ls *ls = r->res_ls;
460         uint32_t bucket = r->res_bucket;
461
462         write_lock(&ls->ls_rsbtbl[bucket].lock);
463         kref_put(&r->res_ref, toss_rsb);
464         write_unlock(&ls->ls_rsbtbl[bucket].lock);
465 }
466
467 void dlm_put_rsb(struct dlm_rsb *r)
468 {
469         put_rsb(r);
470 }
471
472 /* See comment for unhold_lkb */
473
474 static void unhold_rsb(struct dlm_rsb *r)
475 {
476         int rv;
477         rv = kref_put(&r->res_ref, toss_rsb);
478         DLM_ASSERT(!rv, dlm_print_rsb(r););
479 }
480
481 static void kill_rsb(struct kref *kref)
482 {
483         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
484
485         /* All work is done after the return from kref_put() so we
486            can release the write_lock before the remove and free. */
487
488         DLM_ASSERT(list_empty(&r->res_lookup),);
489         DLM_ASSERT(list_empty(&r->res_grantqueue),);
490         DLM_ASSERT(list_empty(&r->res_convertqueue),);
491         DLM_ASSERT(list_empty(&r->res_waitqueue),);
492         DLM_ASSERT(list_empty(&r->res_root_list),);
493         DLM_ASSERT(list_empty(&r->res_recover_list),);
494 }
495
496 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
497    The rsb must exist as long as any lkb's for it do. */
498
499 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
500 {
501         hold_rsb(r);
502         lkb->lkb_resource = r;
503 }
504
505 static void detach_lkb(struct dlm_lkb *lkb)
506 {
507         if (lkb->lkb_resource) {
508                 put_rsb(lkb->lkb_resource);
509                 lkb->lkb_resource = NULL;
510         }
511 }
512
513 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
514 {
515         struct dlm_lkb *lkb, *tmp;
516         uint32_t lkid = 0;
517         uint16_t bucket;
518
519         lkb = allocate_lkb(ls);
520         if (!lkb)
521                 return -ENOMEM;
522
523         lkb->lkb_nodeid = -1;
524         lkb->lkb_grmode = DLM_LOCK_IV;
525         kref_init(&lkb->lkb_ref);
526
527         get_random_bytes(&bucket, sizeof(bucket));
528         bucket &= (ls->ls_lkbtbl_size - 1);
529
530         write_lock(&ls->ls_lkbtbl[bucket].lock);
531
532         /* counter can roll over so we must verify lkid is not in use */
533
534         while (lkid == 0) {
535                 lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16);
536
537                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
538                                     lkb_idtbl_list) {
539                         if (tmp->lkb_id != lkid)
540                                 continue;
541                         lkid = 0;
542                         break;
543                 }
544         }
545
546         lkb->lkb_id = lkid;
547         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
548         write_unlock(&ls->ls_lkbtbl[bucket].lock);
549
550         *lkb_ret = lkb;
551         return 0;
552 }
553
554 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
555 {
556         uint16_t bucket = lkid & 0xFFFF;
557         struct dlm_lkb *lkb;
558
559         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
560                 if (lkb->lkb_id == lkid)
561                         return lkb;
562         }
563         return NULL;
564 }
565
566 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
567 {
568         struct dlm_lkb *lkb;
569         uint16_t bucket = lkid & 0xFFFF;
570
571         if (bucket >= ls->ls_lkbtbl_size)
572                 return -EBADSLT;
573
574         read_lock(&ls->ls_lkbtbl[bucket].lock);
575         lkb = __find_lkb(ls, lkid);
576         if (lkb)
577                 kref_get(&lkb->lkb_ref);
578         read_unlock(&ls->ls_lkbtbl[bucket].lock);
579
580         *lkb_ret = lkb;
581         return lkb ? 0 : -ENOENT;
582 }
583
584 static void kill_lkb(struct kref *kref)
585 {
586         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
587
588         /* All work is done after the return from kref_put() so we
589            can release the write_lock before the detach_lkb */
590
591         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
592 }
593
594 static int put_lkb(struct dlm_lkb *lkb)
595 {
596         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
597         uint16_t bucket = lkb->lkb_id & 0xFFFF;
598
599         write_lock(&ls->ls_lkbtbl[bucket].lock);
600         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
601                 list_del(&lkb->lkb_idtbl_list);
602                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
603
604                 detach_lkb(lkb);
605
606                 /* for local/process lkbs, lvbptr points to caller's lksb */
607                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
608                         free_lvb(lkb->lkb_lvbptr);
609                 free_lkb(lkb);
610                 return 1;
611         } else {
612                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
613                 return 0;
614         }
615 }
616
617 int dlm_put_lkb(struct dlm_lkb *lkb)
618 {
619         return put_lkb(lkb);
620 }
621
622 /* This is only called to add a reference when the code already holds
623    a valid reference to the lkb, so there's no need for locking. */
624
625 static inline void hold_lkb(struct dlm_lkb *lkb)
626 {
627         kref_get(&lkb->lkb_ref);
628 }
629
630 /* This is called when we need to remove a reference and are certain
631    it's not the last ref.  e.g. del_lkb is always called between a
632    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
633    put_lkb would work fine, but would involve unnecessary locking */
634
635 static inline void unhold_lkb(struct dlm_lkb *lkb)
636 {
637         int rv;
638         rv = kref_put(&lkb->lkb_ref, kill_lkb);
639         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
640 }
641
642 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
643                             int mode)
644 {
645         struct dlm_lkb *lkb = NULL;
646
647         list_for_each_entry(lkb, head, lkb_statequeue)
648                 if (lkb->lkb_rqmode < mode)
649                         break;
650
651         if (!lkb)
652                 list_add_tail(new, head);
653         else
654                 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
655 }
656
657 /* add/remove lkb to rsb's grant/convert/wait queue */
658
659 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
660 {
661         kref_get(&lkb->lkb_ref);
662
663         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
664
665         lkb->lkb_status = status;
666
667         switch (status) {
668         case DLM_LKSTS_WAITING:
669                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
670                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
671                 else
672                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
673                 break;
674         case DLM_LKSTS_GRANTED:
675                 /* convention says granted locks kept in order of grmode */
676                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
677                                 lkb->lkb_grmode);
678                 break;
679         case DLM_LKSTS_CONVERT:
680                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
681                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
682                 else
683                         list_add_tail(&lkb->lkb_statequeue,
684                                       &r->res_convertqueue);
685                 break;
686         default:
687                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
688         }
689 }
690
691 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
692 {
693         lkb->lkb_status = 0;
694         list_del(&lkb->lkb_statequeue);
695         unhold_lkb(lkb);
696 }
697
698 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
699 {
700         hold_lkb(lkb);
701         del_lkb(r, lkb);
702         add_lkb(r, lkb, sts);
703         unhold_lkb(lkb);
704 }
705
706 /* add/remove lkb from global waiters list of lkb's waiting for
707    a reply from a remote node */
708
709 static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
710 {
711         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
712
713         mutex_lock(&ls->ls_waiters_mutex);
714         if (lkb->lkb_wait_type) {
715                 log_print("add_to_waiters error %d", lkb->lkb_wait_type);
716                 goto out;
717         }
718         lkb->lkb_wait_type = mstype;
719         kref_get(&lkb->lkb_ref);
720         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
721  out:
722         mutex_unlock(&ls->ls_waiters_mutex);
723 }
724
725 static int _remove_from_waiters(struct dlm_lkb *lkb)
726 {
727         int error = 0;
728
729         if (!lkb->lkb_wait_type) {
730                 log_print("remove_from_waiters error");
731                 error = -EINVAL;
732                 goto out;
733         }
734         lkb->lkb_wait_type = 0;
735         list_del(&lkb->lkb_wait_reply);
736         unhold_lkb(lkb);
737  out:
738         return error;
739 }
740
741 static int remove_from_waiters(struct dlm_lkb *lkb)
742 {
743         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
744         int error;
745
746         mutex_lock(&ls->ls_waiters_mutex);
747         error = _remove_from_waiters(lkb);
748         mutex_unlock(&ls->ls_waiters_mutex);
749         return error;
750 }
751
752 static void dir_remove(struct dlm_rsb *r)
753 {
754         int to_nodeid;
755
756         if (dlm_no_directory(r->res_ls))
757                 return;
758
759         to_nodeid = dlm_dir_nodeid(r);
760         if (to_nodeid != dlm_our_nodeid())
761                 send_remove(r);
762         else
763                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
764                                      r->res_name, r->res_length);
765 }
766
767 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
768    found since they are in order of newest to oldest? */
769
770 static int shrink_bucket(struct dlm_ls *ls, int b)
771 {
772         struct dlm_rsb *r;
773         int count = 0, found;
774
775         for (;;) {
776                 found = 0;
777                 write_lock(&ls->ls_rsbtbl[b].lock);
778                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
779                                             res_hashchain) {
780                         if (!time_after_eq(jiffies, r->res_toss_time +
781                                            dlm_config.toss_secs * HZ))
782                                 continue;
783                         found = 1;
784                         break;
785                 }
786
787                 if (!found) {
788                         write_unlock(&ls->ls_rsbtbl[b].lock);
789                         break;
790                 }
791
792                 if (kref_put(&r->res_ref, kill_rsb)) {
793                         list_del(&r->res_hashchain);
794                         write_unlock(&ls->ls_rsbtbl[b].lock);
795
796                         if (is_master(r))
797                                 dir_remove(r);
798                         free_rsb(r);
799                         count++;
800                 } else {
801                         write_unlock(&ls->ls_rsbtbl[b].lock);
802                         log_error(ls, "tossed rsb in use %s", r->res_name);
803                 }
804         }
805
806         return count;
807 }
808
809 void dlm_scan_rsbs(struct dlm_ls *ls)
810 {
811         int i;
812
813         if (dlm_locking_stopped(ls))
814                 return;
815
816         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
817                 shrink_bucket(ls, i);
818                 cond_resched();
819         }
820 }
821
822 /* lkb is master or local copy */
823
824 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
825 {
826         int b, len = r->res_ls->ls_lvblen;
827
828         /* b=1 lvb returned to caller
829            b=0 lvb written to rsb or invalidated
830            b=-1 do nothing */
831
832         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
833
834         if (b == 1) {
835                 if (!lkb->lkb_lvbptr)
836                         return;
837
838                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
839                         return;
840
841                 if (!r->res_lvbptr)
842                         return;
843
844                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
845                 lkb->lkb_lvbseq = r->res_lvbseq;
846
847         } else if (b == 0) {
848                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
849                         rsb_set_flag(r, RSB_VALNOTVALID);
850                         return;
851                 }
852
853                 if (!lkb->lkb_lvbptr)
854                         return;
855
856                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
857                         return;
858
859                 if (!r->res_lvbptr)
860                         r->res_lvbptr = allocate_lvb(r->res_ls);
861
862                 if (!r->res_lvbptr)
863                         return;
864
865                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
866                 r->res_lvbseq++;
867                 lkb->lkb_lvbseq = r->res_lvbseq;
868                 rsb_clear_flag(r, RSB_VALNOTVALID);
869         }
870
871         if (rsb_flag(r, RSB_VALNOTVALID))
872                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
873 }
874
875 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
876 {
877         if (lkb->lkb_grmode < DLM_LOCK_PW)
878                 return;
879
880         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
881                 rsb_set_flag(r, RSB_VALNOTVALID);
882                 return;
883         }
884
885         if (!lkb->lkb_lvbptr)
886                 return;
887
888         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
889                 return;
890
891         if (!r->res_lvbptr)
892                 r->res_lvbptr = allocate_lvb(r->res_ls);
893
894         if (!r->res_lvbptr)
895                 return;
896
897         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
898         r->res_lvbseq++;
899         rsb_clear_flag(r, RSB_VALNOTVALID);
900 }
901
902 /* lkb is process copy (pc) */
903
904 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
905                             struct dlm_message *ms)
906 {
907         int b;
908
909         if (!lkb->lkb_lvbptr)
910                 return;
911
912         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
913                 return;
914
915         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
916         if (b == 1) {
917                 int len = receive_extralen(ms);
918                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
919                 lkb->lkb_lvbseq = ms->m_lvbseq;
920         }
921 }
922
923 /* Manipulate lkb's on rsb's convert/granted/waiting queues
924    remove_lock -- used for unlock, removes lkb from granted
925    revert_lock -- used for cancel, moves lkb from convert to granted
926    grant_lock  -- used for request and convert, adds lkb to granted or
927                   moves lkb from convert or waiting to granted
928
929    Each of these is used for master or local copy lkb's.  There is
930    also a _pc() variation used to make the corresponding change on
931    a process copy (pc) lkb. */
932
933 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
934 {
935         del_lkb(r, lkb);
936         lkb->lkb_grmode = DLM_LOCK_IV;
937         /* this unhold undoes the original ref from create_lkb()
938            so this leads to the lkb being freed */
939         unhold_lkb(lkb);
940 }
941
942 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
943 {
944         set_lvb_unlock(r, lkb);
945         _remove_lock(r, lkb);
946 }
947
948 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
949 {
950         _remove_lock(r, lkb);
951 }
952
953 static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
954 {
955         lkb->lkb_rqmode = DLM_LOCK_IV;
956
957         switch (lkb->lkb_status) {
958         case DLM_LKSTS_CONVERT:
959                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
960                 break;
961         case DLM_LKSTS_WAITING:
962                 del_lkb(r, lkb);
963                 lkb->lkb_grmode = DLM_LOCK_IV;
964                 /* this unhold undoes the original ref from create_lkb()
965                    so this leads to the lkb being freed */
966                 unhold_lkb(lkb);
967                 break;
968         default:
969                 log_print("invalid status for revert %d", lkb->lkb_status);
970         }
971 }
972
973 static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
974 {
975         revert_lock(r, lkb);
976 }
977
978 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
979 {
980         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
981                 lkb->lkb_grmode = lkb->lkb_rqmode;
982                 if (lkb->lkb_status)
983                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
984                 else
985                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
986         }
987
988         lkb->lkb_rqmode = DLM_LOCK_IV;
989 }
990
991 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
992 {
993         set_lvb_lock(r, lkb);
994         _grant_lock(r, lkb);
995         lkb->lkb_highbast = 0;
996 }
997
998 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
999                           struct dlm_message *ms)
1000 {
1001         set_lvb_lock_pc(r, lkb, ms);
1002         _grant_lock(r, lkb);
1003 }
1004
1005 /* called by grant_pending_locks() which means an async grant message must
1006    be sent to the requesting node in addition to granting the lock if the
1007    lkb belongs to a remote node. */
1008
1009 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1010 {
1011         grant_lock(r, lkb);
1012         if (is_master_copy(lkb))
1013                 send_grant(r, lkb);
1014         else
1015                 queue_cast(r, lkb, 0);
1016 }
1017
1018 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1019 {
1020         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1021                                            lkb_statequeue);
1022         if (lkb->lkb_id == first->lkb_id)
1023                 return 1;
1024
1025         return 0;
1026 }
1027
1028 /* Check if the given lkb conflicts with another lkb on the queue. */
1029
1030 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1031 {
1032         struct dlm_lkb *this;
1033
1034         list_for_each_entry(this, head, lkb_statequeue) {
1035                 if (this == lkb)
1036                         continue;
1037                 if (!modes_compat(this, lkb))
1038                         return 1;
1039         }
1040         return 0;
1041 }
1042
1043 /*
1044  * "A conversion deadlock arises with a pair of lock requests in the converting
1045  * queue for one resource.  The granted mode of each lock blocks the requested
1046  * mode of the other lock."
1047  *
1048  * Part 2: if the granted mode of lkb is preventing the first lkb in the
1049  * convert queue from being granted, then demote lkb (set grmode to NL).
1050  * This second form requires that we check for conv-deadlk even when
1051  * now == 0 in _can_be_granted().
1052  *
1053  * Example:
1054  * Granted Queue: empty
1055  * Convert Queue: NL->EX (first lock)
1056  *                PR->EX (second lock)
1057  *
1058  * The first lock can't be granted because of the granted mode of the second
1059  * lock and the second lock can't be granted because it's not first in the
1060  * list.  We demote the granted mode of the second lock (the lkb passed to this
1061  * function).
1062  *
1063  * After the resolution, the "grant pending" function needs to go back and try
1064  * to grant locks on the convert queue again since the first lock can now be
1065  * granted.
1066  */
1067
1068 static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1069 {
1070         struct dlm_lkb *this, *first = NULL, *self = NULL;
1071
1072         list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1073                 if (!first)
1074                         first = this;
1075                 if (this == lkb) {
1076                         self = lkb;
1077                         continue;
1078                 }
1079
1080                 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
1081                         return 1;
1082         }
1083
1084         /* if lkb is on the convert queue and is preventing the first
1085            from being granted, then there's deadlock and we demote lkb.
1086            multiple converting locks may need to do this before the first
1087            converting lock can be granted. */
1088
1089         if (self && self != first) {
1090                 if (!modes_compat(lkb, first) &&
1091                     !queue_conflict(&rsb->res_grantqueue, first))
1092                         return 1;
1093         }
1094
1095         return 0;
1096 }
1097
1098 /*
1099  * Return 1 if the lock can be granted, 0 otherwise.
1100  * Also detect and resolve conversion deadlocks.
1101  *
1102  * lkb is the lock to be granted
1103  *
1104  * now is 1 if the function is being called in the context of the
1105  * immediate request, it is 0 if called later, after the lock has been
1106  * queued.
1107  *
1108  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1109  */
1110
1111 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1112 {
1113         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1114
1115         /*
1116          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1117          * a new request for a NL mode lock being blocked.
1118          *
1119          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1120          * request, then it would be granted.  In essence, the use of this flag
1121          * tells the Lock Manager to expedite theis request by not considering
1122          * what may be in the CONVERTING or WAITING queues...  As of this
1123          * writing, the EXPEDITE flag can be used only with new requests for NL
1124          * mode locks.  This flag is not valid for conversion requests.
1125          *
1126          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1127          * conversion or used with a non-NL requested mode.  We also know an
1128          * EXPEDITE request is always granted immediately, so now must always
1129          * be 1.  The full condition to grant an expedite request: (now &&
1130          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1131          * therefore be shortened to just checking the flag.
1132          */
1133
1134         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1135                 return 1;
1136
1137         /*
1138          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1139          * added to the remaining conditions.
1140          */
1141
1142         if (queue_conflict(&r->res_grantqueue, lkb))
1143                 goto out;
1144
1145         /*
1146          * 6-3: By default, a conversion request is immediately granted if the
1147          * requested mode is compatible with the modes of all other granted
1148          * locks
1149          */
1150
1151         if (queue_conflict(&r->res_convertqueue, lkb))
1152                 goto out;
1153
1154         /*
1155          * 6-5: But the default algorithm for deciding whether to grant or
1156          * queue conversion requests does not by itself guarantee that such
1157          * requests are serviced on a "first come first serve" basis.  This, in
1158          * turn, can lead to a phenomenon known as "indefinate postponement".
1159          *
1160          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1161          * the system service employed to request a lock conversion.  This flag
1162          * forces certain conversion requests to be queued, even if they are
1163          * compatible with the granted modes of other locks on the same
1164          * resource.  Thus, the use of this flag results in conversion requests
1165          * being ordered on a "first come first servce" basis.
1166          *
1167          * DCT: This condition is all about new conversions being able to occur
1168          * "in place" while the lock remains on the granted queue (assuming
1169          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1170          * doesn't _have_ to go onto the convert queue where it's processed in
1171          * order.  The "now" variable is necessary to distinguish converts
1172          * being received and processed for the first time now, because once a
1173          * convert is moved to the conversion queue the condition below applies
1174          * requiring fifo granting.
1175          */
1176
1177         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1178                 return 1;
1179
1180         /*
1181          * The NOORDER flag is set to avoid the standard vms rules on grant
1182          * order.
1183          */
1184
1185         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1186                 return 1;
1187
1188         /*
1189          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1190          * granted until all other conversion requests ahead of it are granted
1191          * and/or canceled.
1192          */
1193
1194         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1195                 return 1;
1196
1197         /*
1198          * 6-4: By default, a new request is immediately granted only if all
1199          * three of the following conditions are satisfied when the request is
1200          * issued:
1201          * - The queue of ungranted conversion requests for the resource is
1202          *   empty.
1203          * - The queue of ungranted new requests for the resource is empty.
1204          * - The mode of the new request is compatible with the most
1205          *   restrictive mode of all granted locks on the resource.
1206          */
1207
1208         if (now && !conv && list_empty(&r->res_convertqueue) &&
1209             list_empty(&r->res_waitqueue))
1210                 return 1;
1211
1212         /*
1213          * 6-4: Once a lock request is in the queue of ungranted new requests,
1214          * it cannot be granted until the queue of ungranted conversion
1215          * requests is empty, all ungranted new requests ahead of it are
1216          * granted and/or canceled, and it is compatible with the granted mode
1217          * of the most restrictive lock granted on the resource.
1218          */
1219
1220         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1221             first_in_list(lkb, &r->res_waitqueue))
1222                 return 1;
1223
1224  out:
1225         /*
1226          * The following, enabled by CONVDEADLK, departs from VMS.
1227          */
1228
1229         if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1230             conversion_deadlock_detect(r, lkb)) {
1231                 lkb->lkb_grmode = DLM_LOCK_NL;
1232                 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1233         }
1234
1235         return 0;
1236 }
1237
1238 /*
1239  * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1240  * simple way to provide a big optimization to applications that can use them.
1241  */
1242
1243 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1244 {
1245         uint32_t flags = lkb->lkb_exflags;
1246         int rv;
1247         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1248
1249         rv = _can_be_granted(r, lkb, now);
1250         if (rv)
1251                 goto out;
1252
1253         if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1254                 goto out;
1255
1256         if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1257                 alt = DLM_LOCK_PR;
1258         else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1259                 alt = DLM_LOCK_CW;
1260
1261         if (alt) {
1262                 lkb->lkb_rqmode = alt;
1263                 rv = _can_be_granted(r, lkb, now);
1264                 if (rv)
1265                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1266                 else
1267                         lkb->lkb_rqmode = rqmode;
1268         }
1269  out:
1270         return rv;
1271 }
1272
1273 static int grant_pending_convert(struct dlm_rsb *r, int high)
1274 {
1275         struct dlm_lkb *lkb, *s;
1276         int hi, demoted, quit, grant_restart, demote_restart;
1277
1278         quit = 0;
1279  restart:
1280         grant_restart = 0;
1281         demote_restart = 0;
1282         hi = DLM_LOCK_IV;
1283
1284         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1285                 demoted = is_demoted(lkb);
1286                 if (can_be_granted(r, lkb, 0)) {
1287                         grant_lock_pending(r, lkb);
1288                         grant_restart = 1;
1289                 } else {
1290                         hi = max_t(int, lkb->lkb_rqmode, hi);
1291                         if (!demoted && is_demoted(lkb))
1292                                 demote_restart = 1;
1293                 }
1294         }
1295
1296         if (grant_restart)
1297                 goto restart;
1298         if (demote_restart && !quit) {
1299                 quit = 1;
1300                 goto restart;
1301         }
1302
1303         return max_t(int, high, hi);
1304 }
1305
1306 static int grant_pending_wait(struct dlm_rsb *r, int high)
1307 {
1308         struct dlm_lkb *lkb, *s;
1309
1310         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1311                 if (can_be_granted(r, lkb, 0))
1312                         grant_lock_pending(r, lkb);
1313                 else
1314                         high = max_t(int, lkb->lkb_rqmode, high);
1315         }
1316
1317         return high;
1318 }
1319
1320 static void grant_pending_locks(struct dlm_rsb *r)
1321 {
1322         struct dlm_lkb *lkb, *s;
1323         int high = DLM_LOCK_IV;
1324
1325         DLM_ASSERT(is_master(r), dlm_print_rsb(r););
1326
1327         high = grant_pending_convert(r, high);
1328         high = grant_pending_wait(r, high);
1329
1330         if (high == DLM_LOCK_IV)
1331                 return;
1332
1333         /*
1334          * If there are locks left on the wait/convert queue then send blocking
1335          * ASTs to granted locks based on the largest requested mode (high)
1336          * found above. FIXME: highbast < high comparison not valid for PR/CW.
1337          */
1338
1339         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1340                 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1341                     !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1342                         queue_bast(r, lkb, high);
1343                         lkb->lkb_highbast = high;
1344                 }
1345         }
1346 }
1347
1348 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1349                             struct dlm_lkb *lkb)
1350 {
1351         struct dlm_lkb *gr;
1352
1353         list_for_each_entry(gr, head, lkb_statequeue) {
1354                 if (gr->lkb_bastaddr &&
1355                     gr->lkb_highbast < lkb->lkb_rqmode &&
1356                     !modes_compat(gr, lkb)) {
1357                         queue_bast(r, gr, lkb->lkb_rqmode);
1358                         gr->lkb_highbast = lkb->lkb_rqmode;
1359                 }
1360         }
1361 }
1362
1363 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1364 {
1365         send_bast_queue(r, &r->res_grantqueue, lkb);
1366 }
1367
1368 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1369 {
1370         send_bast_queue(r, &r->res_grantqueue, lkb);
1371         send_bast_queue(r, &r->res_convertqueue, lkb);
1372 }
1373
1374 /* set_master(r, lkb) -- set the master nodeid of a resource
1375
1376    The purpose of this function is to set the nodeid field in the given
1377    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1378    known, it can just be copied to the lkb and the function will return
1379    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1380    before it can be copied to the lkb.
1381
1382    When the rsb nodeid is being looked up remotely, the initial lkb
1383    causing the lookup is kept on the ls_waiters list waiting for the
1384    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1385    on the rsb's res_lookup list until the master is verified.
1386
1387    Return values:
1388    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1389    1: the rsb master is not available and the lkb has been placed on
1390       a wait queue
1391 */
1392
1393 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1394 {
1395         struct dlm_ls *ls = r->res_ls;
1396         int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1397
1398         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1399                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1400                 r->res_first_lkid = lkb->lkb_id;
1401                 lkb->lkb_nodeid = r->res_nodeid;
1402                 return 0;
1403         }
1404
1405         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1406                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1407                 return 1;
1408         }
1409
1410         if (r->res_nodeid == 0) {
1411                 lkb->lkb_nodeid = 0;
1412                 return 0;
1413         }
1414
1415         if (r->res_nodeid > 0) {
1416                 lkb->lkb_nodeid = r->res_nodeid;
1417                 return 0;
1418         }
1419
1420         DLM_ASSERT(r->res_nodeid == -1, dlm_print_rsb(r););
1421
1422         dir_nodeid = dlm_dir_nodeid(r);
1423
1424         if (dir_nodeid != our_nodeid) {
1425                 r->res_first_lkid = lkb->lkb_id;
1426                 send_lookup(r, lkb);
1427                 return 1;
1428         }
1429
1430         for (;;) {
1431                 /* It's possible for dlm_scand to remove an old rsb for
1432                    this same resource from the toss list, us to create
1433                    a new one, look up the master locally, and find it
1434                    already exists just before dlm_scand does the
1435                    dir_remove() on the previous rsb. */
1436
1437                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1438                                        r->res_length, &ret_nodeid);
1439                 if (!error)
1440                         break;
1441                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1442                 schedule();
1443         }
1444
1445         if (ret_nodeid == our_nodeid) {
1446                 r->res_first_lkid = 0;
1447                 r->res_nodeid = 0;
1448                 lkb->lkb_nodeid = 0;
1449         } else {
1450                 r->res_first_lkid = lkb->lkb_id;
1451                 r->res_nodeid = ret_nodeid;
1452                 lkb->lkb_nodeid = ret_nodeid;
1453         }
1454         return 0;
1455 }
1456
1457 static void process_lookup_list(struct dlm_rsb *r)
1458 {
1459         struct dlm_lkb *lkb, *safe;
1460
1461         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1462                 list_del(&lkb->lkb_rsb_lookup);
1463                 _request_lock(r, lkb);
1464                 schedule();
1465         }
1466 }
1467
1468 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1469
1470 static void confirm_master(struct dlm_rsb *r, int error)
1471 {
1472         struct dlm_lkb *lkb;
1473
1474         if (!r->res_first_lkid)
1475                 return;
1476
1477         switch (error) {
1478         case 0:
1479         case -EINPROGRESS:
1480                 r->res_first_lkid = 0;
1481                 process_lookup_list(r);
1482                 break;
1483
1484         case -EAGAIN:
1485                 /* the remote master didn't queue our NOQUEUE request;
1486                    make a waiting lkb the first_lkid */
1487
1488                 r->res_first_lkid = 0;
1489
1490                 if (!list_empty(&r->res_lookup)) {
1491                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1492                                          lkb_rsb_lookup);
1493                         list_del(&lkb->lkb_rsb_lookup);
1494                         r->res_first_lkid = lkb->lkb_id;
1495                         _request_lock(r, lkb);
1496                 } else
1497                         r->res_nodeid = -1;
1498                 break;
1499
1500         default:
1501                 log_error(r->res_ls, "confirm_master unknown error %d", error);
1502         }
1503 }
1504
1505 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1506                          int namelen, uint32_t parent_lkid, void *ast,
1507                          void *astarg, void *bast, struct dlm_args *args)
1508 {
1509         int rv = -EINVAL;
1510
1511         /* check for invalid arg usage */
1512
1513         if (mode < 0 || mode > DLM_LOCK_EX)
1514                 goto out;
1515
1516         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1517                 goto out;
1518
1519         if (flags & DLM_LKF_CANCEL)
1520                 goto out;
1521
1522         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1523                 goto out;
1524
1525         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1526                 goto out;
1527
1528         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1529                 goto out;
1530
1531         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1532                 goto out;
1533
1534         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1535                 goto out;
1536
1537         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1538                 goto out;
1539
1540         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1541                 goto out;
1542
1543         if (!ast || !lksb)
1544                 goto out;
1545
1546         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1547                 goto out;
1548
1549         /* parent/child locks not yet supported */
1550         if (parent_lkid)
1551                 goto out;
1552
1553         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1554                 goto out;
1555
1556         /* these args will be copied to the lkb in validate_lock_args,
1557            it cannot be done now because when converting locks, fields in
1558            an active lkb cannot be modified before locking the rsb */
1559
1560         args->flags = flags;
1561         args->astaddr = ast;
1562         args->astparam = (long) astarg;
1563         args->bastaddr = bast;
1564         args->mode = mode;
1565         args->lksb = lksb;
1566         rv = 0;
1567  out:
1568         return rv;
1569 }
1570
1571 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1572 {
1573         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1574                       DLM_LKF_FORCEUNLOCK))
1575                 return -EINVAL;
1576
1577         args->flags = flags;
1578         args->astparam = (long) astarg;
1579         return 0;
1580 }
1581
1582 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1583                               struct dlm_args *args)
1584 {
1585         int rv = -EINVAL;
1586
1587         if (args->flags & DLM_LKF_CONVERT) {
1588                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1589                         goto out;
1590
1591                 if (args->flags & DLM_LKF_QUECVT &&
1592                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1593                         goto out;
1594
1595                 rv = -EBUSY;
1596                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1597                         goto out;
1598
1599                 if (lkb->lkb_wait_type)
1600                         goto out;
1601         }
1602
1603         lkb->lkb_exflags = args->flags;
1604         lkb->lkb_sbflags = 0;
1605         lkb->lkb_astaddr = args->astaddr;
1606         lkb->lkb_astparam = args->astparam;
1607         lkb->lkb_bastaddr = args->bastaddr;
1608         lkb->lkb_rqmode = args->mode;
1609         lkb->lkb_lksb = args->lksb;
1610         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1611         lkb->lkb_ownpid = (int) current->pid;
1612         rv = 0;
1613  out:
1614         return rv;
1615 }
1616
1617 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1618 {
1619         int rv = -EINVAL;
1620
1621         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1622                 goto out;
1623
1624         if (args->flags & DLM_LKF_FORCEUNLOCK)
1625                 goto out_ok;
1626
1627         if (args->flags & DLM_LKF_CANCEL &&
1628             lkb->lkb_status == DLM_LKSTS_GRANTED)
1629                 goto out;
1630
1631         if (!(args->flags & DLM_LKF_CANCEL) &&
1632             lkb->lkb_status != DLM_LKSTS_GRANTED)
1633                 goto out;
1634
1635         rv = -EBUSY;
1636         if (lkb->lkb_wait_type)
1637                 goto out;
1638
1639  out_ok:
1640         lkb->lkb_exflags = args->flags;
1641         lkb->lkb_sbflags = 0;
1642         lkb->lkb_astparam = args->astparam;
1643
1644         rv = 0;
1645  out:
1646         return rv;
1647 }
1648
1649 /*
1650  * Four stage 4 varieties:
1651  * do_request(), do_convert(), do_unlock(), do_cancel()
1652  * These are called on the master node for the given lock and
1653  * from the central locking logic.
1654  */
1655
1656 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1657 {
1658         int error = 0;
1659
1660         if (can_be_granted(r, lkb, 1)) {
1661                 grant_lock(r, lkb);
1662                 queue_cast(r, lkb, 0);
1663                 goto out;
1664         }
1665
1666         if (can_be_queued(lkb)) {
1667                 error = -EINPROGRESS;
1668                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1669                 send_blocking_asts(r, lkb);
1670                 goto out;
1671         }
1672
1673         error = -EAGAIN;
1674         if (force_blocking_asts(lkb))
1675                 send_blocking_asts_all(r, lkb);
1676         queue_cast(r, lkb, -EAGAIN);
1677
1678  out:
1679         return error;
1680 }
1681
1682 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1683 {
1684         int error = 0;
1685
1686         /* changing an existing lock may allow others to be granted */
1687
1688         if (can_be_granted(r, lkb, 1)) {
1689                 grant_lock(r, lkb);
1690                 queue_cast(r, lkb, 0);
1691                 grant_pending_locks(r);
1692                 goto out;
1693         }
1694
1695         if (can_be_queued(lkb)) {
1696                 if (is_demoted(lkb))
1697                         grant_pending_locks(r);
1698                 error = -EINPROGRESS;
1699                 del_lkb(r, lkb);
1700                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
1701                 send_blocking_asts(r, lkb);
1702                 goto out;
1703         }
1704
1705         error = -EAGAIN;
1706         if (force_blocking_asts(lkb))
1707                 send_blocking_asts_all(r, lkb);
1708         queue_cast(r, lkb, -EAGAIN);
1709
1710  out:
1711         return error;
1712 }
1713
1714 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1715 {
1716         remove_lock(r, lkb);
1717         queue_cast(r, lkb, -DLM_EUNLOCK);
1718         grant_pending_locks(r);
1719         return -DLM_EUNLOCK;
1720 }
1721
1722 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1723 {
1724         revert_lock(r, lkb);
1725         queue_cast(r, lkb, -DLM_ECANCEL);
1726         grant_pending_locks(r);
1727         return -DLM_ECANCEL;
1728 }
1729
1730 /*
1731  * Four stage 3 varieties:
1732  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
1733  */
1734
1735 /* add a new lkb to a possibly new rsb, called by requesting process */
1736
1737 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1738 {
1739         int error;
1740
1741         /* set_master: sets lkb nodeid from r */
1742
1743         error = set_master(r, lkb);
1744         if (error < 0)
1745                 goto out;
1746         if (error) {
1747                 error = 0;
1748                 goto out;
1749         }
1750
1751         if (is_remote(r))
1752                 /* receive_request() calls do_request() on remote node */
1753                 error = send_request(r, lkb);
1754         else
1755                 error = do_request(r, lkb);
1756  out:
1757         return error;
1758 }
1759
1760 /* change some property of an existing lkb, e.g. mode */
1761
1762 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1763 {
1764         int error;
1765
1766         if (is_remote(r))
1767                 /* receive_convert() calls do_convert() on remote node */
1768                 error = send_convert(r, lkb);
1769         else
1770                 error = do_convert(r, lkb);
1771
1772         return error;
1773 }
1774
1775 /* remove an existing lkb from the granted queue */
1776
1777 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1778 {
1779         int error;
1780
1781         if (is_remote(r))
1782                 /* receive_unlock() calls do_unlock() on remote node */
1783                 error = send_unlock(r, lkb);
1784         else
1785                 error = do_unlock(r, lkb);
1786
1787         return error;
1788 }
1789
1790 /* remove an existing lkb from the convert or wait queue */
1791
1792 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1793 {
1794         int error;
1795
1796         if (is_remote(r))
1797                 /* receive_cancel() calls do_cancel() on remote node */
1798                 error = send_cancel(r, lkb);
1799         else
1800                 error = do_cancel(r, lkb);
1801
1802         return error;
1803 }
1804
1805 /*
1806  * Four stage 2 varieties:
1807  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
1808  */
1809
1810 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
1811                         int len, struct dlm_args *args)
1812 {
1813         struct dlm_rsb *r;
1814         int error;
1815
1816         error = validate_lock_args(ls, lkb, args);
1817         if (error)
1818                 goto out;
1819
1820         error = find_rsb(ls, name, len, R_CREATE, &r);
1821         if (error)
1822                 goto out;
1823
1824         lock_rsb(r);
1825
1826         attach_lkb(r, lkb);
1827         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
1828
1829         error = _request_lock(r, lkb);
1830
1831         unlock_rsb(r);
1832         put_rsb(r);
1833
1834  out:
1835         return error;
1836 }
1837
1838 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1839                         struct dlm_args *args)
1840 {
1841         struct dlm_rsb *r;
1842         int error;
1843
1844         r = lkb->lkb_resource;
1845
1846         hold_rsb(r);
1847         lock_rsb(r);
1848
1849         error = validate_lock_args(ls, lkb, args);
1850         if (error)
1851                 goto out;
1852
1853         error = _convert_lock(r, lkb);
1854  out:
1855         unlock_rsb(r);
1856         put_rsb(r);
1857         return error;
1858 }
1859
1860 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1861                        struct dlm_args *args)
1862 {
1863         struct dlm_rsb *r;
1864         int error;
1865
1866         r = lkb->lkb_resource;
1867
1868         hold_rsb(r);
1869         lock_rsb(r);
1870
1871         error = validate_unlock_args(lkb, args);
1872         if (error)
1873                 goto out;
1874
1875         error = _unlock_lock(r, lkb);
1876  out:
1877         unlock_rsb(r);
1878         put_rsb(r);
1879         return error;
1880 }
1881
1882 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1883                        struct dlm_args *args)
1884 {
1885         struct dlm_rsb *r;
1886         int error;
1887
1888         r = lkb->lkb_resource;
1889
1890         hold_rsb(r);
1891         lock_rsb(r);
1892
1893         error = validate_unlock_args(lkb, args);
1894         if (error)
1895                 goto out;
1896
1897         error = _cancel_lock(r, lkb);
1898  out:
1899         unlock_rsb(r);
1900         put_rsb(r);
1901         return error;
1902 }
1903
1904 /*
1905  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
1906  */
1907
1908 int dlm_lock(dlm_lockspace_t *lockspace,
1909              int mode,
1910              struct dlm_lksb *lksb,
1911              uint32_t flags,
1912              void *name,
1913              unsigned int namelen,
1914              uint32_t parent_lkid,
1915              void (*ast) (void *astarg),
1916              void *astarg,
1917              void (*bast) (void *astarg, int mode))
1918 {
1919         struct dlm_ls *ls;
1920         struct dlm_lkb *lkb;
1921         struct dlm_args args;
1922         int error, convert = flags & DLM_LKF_CONVERT;
1923
1924         ls = dlm_find_lockspace_local(lockspace);
1925         if (!ls)
1926                 return -EINVAL;
1927
1928         lock_recovery(ls);
1929
1930         if (convert)
1931                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
1932         else
1933                 error = create_lkb(ls, &lkb);
1934
1935         if (error)
1936                 goto out;
1937
1938         error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
1939                               astarg, bast, &args);
1940         if (error)
1941                 goto out_put;
1942
1943         if (convert)
1944                 error = convert_lock(ls, lkb, &args);
1945         else
1946                 error = request_lock(ls, lkb, name, namelen, &args);
1947
1948         if (error == -EINPROGRESS)
1949                 error = 0;
1950  out_put:
1951         if (convert || error)
1952                 put_lkb(lkb);
1953         if (error == -EAGAIN)
1954                 error = 0;
1955  out:
1956         unlock_recovery(ls);
1957         dlm_put_lockspace(ls);
1958         return error;
1959 }
1960
1961 int dlm_unlock(dlm_lockspace_t *lockspace,
1962                uint32_t lkid,
1963                uint32_t flags,
1964                struct dlm_lksb *lksb,
1965                void *astarg)
1966 {
1967         struct dlm_ls *ls;
1968         struct dlm_lkb *lkb;
1969         struct dlm_args args;
1970         int error;
1971
1972         ls = dlm_find_lockspace_local(lockspace);
1973         if (!ls)
1974                 return -EINVAL;
1975
1976         lock_recovery(ls);
1977
1978         error = find_lkb(ls, lkid, &lkb);
1979         if (error)
1980                 goto out;
1981
1982         error = set_unlock_args(flags, astarg, &args);
1983         if (error)
1984                 goto out_put;
1985
1986         if (flags & DLM_LKF_CANCEL)
1987                 error = cancel_lock(ls, lkb, &args);
1988         else
1989                 error = unlock_lock(ls, lkb, &args);
1990
1991         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
1992                 error = 0;
1993  out_put:
1994         put_lkb(lkb);
1995  out:
1996         unlock_recovery(ls);
1997         dlm_put_lockspace(ls);
1998         return error;
1999 }
2000
2001 /*
2002  * send/receive routines for remote operations and replies
2003  *
2004  * send_args
2005  * send_common
2006  * send_request                 receive_request
2007  * send_convert                 receive_convert
2008  * send_unlock                  receive_unlock
2009  * send_cancel                  receive_cancel
2010  * send_grant                   receive_grant
2011  * send_bast                    receive_bast
2012  * send_lookup                  receive_lookup
2013  * send_remove                  receive_remove
2014  *
2015  *                              send_common_reply
2016  * receive_request_reply        send_request_reply
2017  * receive_convert_reply        send_convert_reply
2018  * receive_unlock_reply         send_unlock_reply
2019  * receive_cancel_reply         send_cancel_reply
2020  * receive_lookup_reply         send_lookup_reply
2021  */
2022
2023 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2024                           int to_nodeid, int mstype,
2025                           struct dlm_message **ms_ret,
2026                           struct dlm_mhandle **mh_ret)
2027 {
2028         struct dlm_message *ms;
2029         struct dlm_mhandle *mh;
2030         char *mb;
2031         int mb_len = sizeof(struct dlm_message);
2032
2033         switch (mstype) {
2034         case DLM_MSG_REQUEST:
2035         case DLM_MSG_LOOKUP:
2036         case DLM_MSG_REMOVE:
2037                 mb_len += r->res_length;
2038                 break;
2039         case DLM_MSG_CONVERT:
2040         case DLM_MSG_UNLOCK:
2041         case DLM_MSG_REQUEST_REPLY:
2042         case DLM_MSG_CONVERT_REPLY:
2043         case DLM_MSG_GRANT:
2044                 if (lkb && lkb->lkb_lvbptr)
2045                         mb_len += r->res_ls->ls_lvblen;
2046                 break;
2047         }
2048
2049         /* get_buffer gives us a message handle (mh) that we need to
2050            pass into lowcomms_commit and a message buffer (mb) that we
2051            write our data into */
2052
2053         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2054         if (!mh)
2055                 return -ENOBUFS;
2056
2057         memset(mb, 0, mb_len);
2058
2059         ms = (struct dlm_message *) mb;
2060
2061         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2062         ms->m_header.h_lockspace = r->res_ls->ls_global_id;
2063         ms->m_header.h_nodeid = dlm_our_nodeid();
2064         ms->m_header.h_length = mb_len;
2065         ms->m_header.h_cmd = DLM_MSG;
2066
2067         ms->m_type = mstype;
2068
2069         *mh_ret = mh;
2070         *ms_ret = ms;
2071         return 0;
2072 }
2073
2074 /* further lowcomms enhancements or alternate implementations may make
2075    the return value from this function useful at some point */
2076
2077 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2078 {
2079         dlm_message_out(ms);
2080         dlm_lowcomms_commit_buffer(mh);
2081         return 0;
2082 }
2083
2084 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2085                       struct dlm_message *ms)
2086 {
2087         ms->m_nodeid   = lkb->lkb_nodeid;
2088         ms->m_pid      = lkb->lkb_ownpid;
2089         ms->m_lkid     = lkb->lkb_id;
2090         ms->m_remid    = lkb->lkb_remid;
2091         ms->m_exflags  = lkb->lkb_exflags;
2092         ms->m_sbflags  = lkb->lkb_sbflags;
2093         ms->m_flags    = lkb->lkb_flags;
2094         ms->m_lvbseq   = lkb->lkb_lvbseq;
2095         ms->m_status   = lkb->lkb_status;
2096         ms->m_grmode   = lkb->lkb_grmode;
2097         ms->m_rqmode   = lkb->lkb_rqmode;
2098         ms->m_hash     = r->res_hash;
2099
2100         /* m_result and m_bastmode are set from function args,
2101            not from lkb fields */
2102
2103         if (lkb->lkb_bastaddr)
2104                 ms->m_asts |= AST_BAST;
2105         if (lkb->lkb_astaddr)
2106                 ms->m_asts |= AST_COMP;
2107
2108         if (ms->m_type == DLM_MSG_REQUEST || ms->m_type == DLM_MSG_LOOKUP)
2109                 memcpy(ms->m_extra, r->res_name, r->res_length);
2110
2111         else if (lkb->lkb_lvbptr)
2112                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2113
2114 }
2115
2116 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2117 {
2118         struct dlm_message *ms;
2119         struct dlm_mhandle *mh;
2120         int to_nodeid, error;
2121
2122         add_to_waiters(lkb, mstype);
2123
2124         to_nodeid = r->res_nodeid;
2125
2126         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2127         if (error)
2128                 goto fail;
2129
2130         send_args(r, lkb, ms);
2131
2132         error = send_message(mh, ms);
2133         if (error)
2134                 goto fail;
2135         return 0;
2136
2137  fail:
2138         remove_from_waiters(lkb);
2139         return error;
2140 }
2141
2142 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2143 {
2144         return send_common(r, lkb, DLM_MSG_REQUEST);
2145 }
2146
2147 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2148 {
2149         int error;
2150
2151         error = send_common(r, lkb, DLM_MSG_CONVERT);
2152
2153         /* down conversions go without a reply from the master */
2154         if (!error && down_conversion(lkb)) {
2155                 remove_from_waiters(lkb);
2156                 r->res_ls->ls_stub_ms.m_result = 0;
2157                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2158         }
2159
2160         return error;
2161 }
2162
2163 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2164    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2165    that the master is still correct. */
2166
2167 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2168 {
2169         return send_common(r, lkb, DLM_MSG_UNLOCK);
2170 }
2171
2172 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2173 {
2174         return send_common(r, lkb, DLM_MSG_CANCEL);
2175 }
2176
2177 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2178 {
2179         struct dlm_message *ms;
2180         struct dlm_mhandle *mh;
2181         int to_nodeid, error;
2182
2183         to_nodeid = lkb->lkb_nodeid;
2184
2185         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2186         if (error)
2187                 goto out;
2188
2189         send_args(r, lkb, ms);
2190
2191         ms->m_result = 0;
2192
2193         error = send_message(mh, ms);
2194  out:
2195         return error;
2196 }
2197
2198 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2199 {
2200         struct dlm_message *ms;
2201         struct dlm_mhandle *mh;
2202         int to_nodeid, error;
2203
2204         to_nodeid = lkb->lkb_nodeid;
2205
2206         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2207         if (error)
2208                 goto out;
2209
2210         send_args(r, lkb, ms);
2211
2212         ms->m_bastmode = mode;
2213
2214         error = send_message(mh, ms);
2215  out:
2216         return error;
2217 }
2218
2219 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2220 {
2221         struct dlm_message *ms;
2222         struct dlm_mhandle *mh;
2223         int to_nodeid, error;
2224
2225         add_to_waiters(lkb, DLM_MSG_LOOKUP);
2226
2227         to_nodeid = dlm_dir_nodeid(r);
2228
2229         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2230         if (error)
2231                 goto fail;
2232
2233         send_args(r, lkb, ms);
2234
2235         error = send_message(mh, ms);
2236         if (error)
2237                 goto fail;
2238         return 0;
2239
2240  fail:
2241         remove_from_waiters(lkb);
2242         return error;
2243 }
2244
2245 static int send_remove(struct dlm_rsb *r)
2246 {
2247         struct dlm_message *ms;
2248         struct dlm_mhandle *mh;
2249         int to_nodeid, error;
2250
2251         to_nodeid = dlm_dir_nodeid(r);
2252
2253         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2254         if (error)
2255                 goto out;
2256
2257         memcpy(ms->m_extra, r->res_name, r->res_length);
2258         ms->m_hash = r->res_hash;
2259
2260         error = send_message(mh, ms);
2261  out:
2262         return error;
2263 }
2264
2265 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2266                              int mstype, int rv)
2267 {
2268         struct dlm_message *ms;
2269         struct dlm_mhandle *mh;
2270         int to_nodeid, error;
2271
2272         to_nodeid = lkb->lkb_nodeid;
2273
2274         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2275         if (error)
2276                 goto out;
2277
2278         send_args(r, lkb, ms);
2279
2280         ms->m_result = rv;
2281
2282         error = send_message(mh, ms);
2283  out:
2284         return error;
2285 }
2286
2287 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2288 {
2289         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2290 }
2291
2292 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2293 {
2294         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2295 }
2296
2297 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2298 {
2299         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2300 }
2301
2302 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2303 {
2304         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2305 }
2306
2307 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2308                              int ret_nodeid, int rv)
2309 {
2310         struct dlm_rsb *r = &ls->ls_stub_rsb;
2311         struct dlm_message *ms;
2312         struct dlm_mhandle *mh;
2313         int error, nodeid = ms_in->m_header.h_nodeid;
2314
2315         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2316         if (error)
2317                 goto out;
2318
2319         ms->m_lkid = ms_in->m_lkid;
2320         ms->m_result = rv;
2321         ms->m_nodeid = ret_nodeid;
2322
2323         error = send_message(mh, ms);
2324  out:
2325         return error;
2326 }
2327
2328 /* which args we save from a received message depends heavily on the type
2329    of message, unlike the send side where we can safely send everything about
2330    the lkb for any type of message */
2331
2332 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2333 {
2334         lkb->lkb_exflags = ms->m_exflags;
2335         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2336                          (ms->m_flags & 0x0000FFFF);
2337 }
2338
2339 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2340 {
2341         lkb->lkb_sbflags = ms->m_sbflags;
2342         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2343                          (ms->m_flags & 0x0000FFFF);
2344 }
2345
2346 static int receive_extralen(struct dlm_message *ms)
2347 {
2348         return (ms->m_header.h_length - sizeof(struct dlm_message));
2349 }
2350
2351 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2352                        struct dlm_message *ms)
2353 {
2354         int len;
2355
2356         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2357                 if (!lkb->lkb_lvbptr)
2358                         lkb->lkb_lvbptr = allocate_lvb(ls);
2359                 if (!lkb->lkb_lvbptr)
2360                         return -ENOMEM;
2361                 len = receive_extralen(ms);
2362                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2363         }
2364         return 0;
2365 }
2366
2367 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2368                                 struct dlm_message *ms)
2369 {
2370         lkb->lkb_nodeid = ms->m_header.h_nodeid;
2371         lkb->lkb_ownpid = ms->m_pid;
2372         lkb->lkb_remid = ms->m_lkid;
2373         lkb->lkb_grmode = DLM_LOCK_IV;
2374         lkb->lkb_rqmode = ms->m_rqmode;
2375         lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2376         lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2377
2378         DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2379
2380         if (receive_lvb(ls, lkb, ms))
2381                 return -ENOMEM;
2382
2383         return 0;
2384 }
2385
2386 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2387                                 struct dlm_message *ms)
2388 {
2389         if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2390                 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2391                           lkb->lkb_nodeid, ms->m_header.h_nodeid,
2392                           lkb->lkb_id, lkb->lkb_remid);
2393                 return -EINVAL;
2394         }
2395
2396         if (!is_master_copy(lkb))
2397                 return -EINVAL;
2398
2399         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2400                 return -EBUSY;
2401
2402         if (receive_lvb(ls, lkb, ms))
2403                 return -ENOMEM;
2404
2405         lkb->lkb_rqmode = ms->m_rqmode;
2406         lkb->lkb_lvbseq = ms->m_lvbseq;
2407
2408         return 0;
2409 }
2410
2411 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2412                                struct dlm_message *ms)
2413 {
2414         if (!is_master_copy(lkb))
2415                 return -EINVAL;
2416         if (receive_lvb(ls, lkb, ms))
2417                 return -ENOMEM;
2418         return 0;
2419 }
2420
2421 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2422    uses to send a reply and that the remote end uses to process the reply. */
2423
2424 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2425 {
2426         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2427         lkb->lkb_nodeid = ms->m_header.h_nodeid;
2428         lkb->lkb_remid = ms->m_lkid;
2429 }
2430
2431 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2432 {
2433         struct dlm_lkb *lkb;
2434         struct dlm_rsb *r;
2435         int error, namelen;
2436
2437         error = create_lkb(ls, &lkb);
2438         if (error)
2439                 goto fail;
2440
2441         receive_flags(lkb, ms);
2442         lkb->lkb_flags |= DLM_IFL_MSTCPY;
2443         error = receive_request_args(ls, lkb, ms);
2444         if (error) {
2445                 put_lkb(lkb);
2446                 goto fail;
2447         }
2448
2449         namelen = receive_extralen(ms);
2450
2451         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2452         if (error) {
2453                 put_lkb(lkb);
2454                 goto fail;
2455         }
2456
2457         lock_rsb(r);
2458
2459         attach_lkb(r, lkb);
2460         error = do_request(r, lkb);
2461         send_request_reply(r, lkb, error);
2462
2463         unlock_rsb(r);
2464         put_rsb(r);
2465
2466         if (error == -EINPROGRESS)
2467                 error = 0;
2468         if (error)
2469                 put_lkb(lkb);
2470         return;
2471
2472  fail:
2473         setup_stub_lkb(ls, ms);
2474         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2475 }
2476
2477 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2478 {
2479         struct dlm_lkb *lkb;
2480         struct dlm_rsb *r;
2481         int error, reply = 1;
2482
2483         error = find_lkb(ls, ms->m_remid, &lkb);
2484         if (error)
2485                 goto fail;
2486
2487         r = lkb->lkb_resource;
2488
2489         hold_rsb(r);
2490         lock_rsb(r);
2491
2492         receive_flags(lkb, ms);
2493         error = receive_convert_args(ls, lkb, ms);
2494         if (error)
2495                 goto out;
2496         reply = !down_conversion(lkb);
2497
2498         error = do_convert(r, lkb);
2499  out:
2500         if (reply)
2501                 send_convert_reply(r, lkb, error);
2502
2503         unlock_rsb(r);
2504         put_rsb(r);
2505         put_lkb(lkb);
2506         return;
2507
2508  fail:
2509         setup_stub_lkb(ls, ms);
2510         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2511 }
2512
2513 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2514 {
2515         struct dlm_lkb *lkb;
2516         struct dlm_rsb *r;
2517         int error;
2518
2519         error = find_lkb(ls, ms->m_remid, &lkb);
2520         if (error)
2521                 goto fail;
2522
2523         r = lkb->lkb_resource;
2524
2525         hold_rsb(r);
2526         lock_rsb(r);
2527
2528         receive_flags(lkb, ms);
2529         error = receive_unlock_args(ls, lkb, ms);
2530         if (error)
2531                 goto out;
2532
2533         error = do_unlock(r, lkb);
2534  out:
2535         send_unlock_reply(r, lkb, error);
2536
2537         unlock_rsb(r);
2538         put_rsb(r);
2539         put_lkb(lkb);
2540         return;
2541
2542  fail:
2543         setup_stub_lkb(ls, ms);
2544         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2545 }
2546
2547 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2548 {
2549         struct dlm_lkb *lkb;
2550         struct dlm_rsb *r;
2551         int error;
2552
2553         error = find_lkb(ls, ms->m_remid, &lkb);
2554         if (error)
2555                 goto fail;
2556
2557         receive_flags(lkb, ms);
2558
2559         r = lkb->lkb_resource;
2560
2561         hold_rsb(r);
2562         lock_rsb(r);
2563
2564         error = do_cancel(r, lkb);
2565         send_cancel_reply(r, lkb, error);
2566
2567         unlock_rsb(r);
2568         put_rsb(r);
2569         put_lkb(lkb);
2570         return;
2571
2572  fail:
2573         setup_stub_lkb(ls, ms);
2574         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2575 }
2576
2577 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2578 {
2579         struct dlm_lkb *lkb;
2580         struct dlm_rsb *r;
2581         int error;
2582
2583         error = find_lkb(ls, ms->m_remid, &lkb);
2584         if (error) {
2585                 log_error(ls, "receive_grant no lkb");
2586                 return;
2587         }
2588         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2589
2590         r = lkb->lkb_resource;
2591
2592         hold_rsb(r);
2593         lock_rsb(r);
2594
2595         receive_flags_reply(lkb, ms);
2596         grant_lock_pc(r, lkb, ms);
2597         queue_cast(r, lkb, 0);
2598
2599         unlock_rsb(r);
2600         put_rsb(r);
2601         put_lkb(lkb);
2602 }
2603
2604 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2605 {
2606         struct dlm_lkb *lkb;
2607         struct dlm_rsb *r;
2608         int error;
2609
2610         error = find_lkb(ls, ms->m_remid, &lkb);
2611         if (error) {
2612                 log_error(ls, "receive_bast no lkb");
2613                 return;
2614         }
2615         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2616
2617         r = lkb->lkb_resource;
2618
2619         hold_rsb(r);
2620         lock_rsb(r);
2621
2622         queue_bast(r, lkb, ms->m_bastmode);
2623
2624         unlock_rsb(r);
2625         put_rsb(r);
2626         put_lkb(lkb);
2627 }
2628
2629 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
2630 {
2631         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
2632
2633         from_nodeid = ms->m_header.h_nodeid;
2634         our_nodeid = dlm_our_nodeid();
2635
2636         len = receive_extralen(ms);
2637
2638         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2639         if (dir_nodeid != our_nodeid) {
2640                 log_error(ls, "lookup dir_nodeid %d from %d",
2641                           dir_nodeid, from_nodeid);
2642                 error = -EINVAL;
2643                 ret_nodeid = -1;
2644                 goto out;
2645         }
2646
2647         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
2648
2649         /* Optimization: we're master so treat lookup as a request */
2650         if (!error && ret_nodeid == our_nodeid) {
2651                 receive_request(ls, ms);
2652                 return;
2653         }
2654  out:
2655         send_lookup_reply(ls, ms, ret_nodeid, error);
2656 }
2657
2658 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
2659 {
2660         int len, dir_nodeid, from_nodeid;
2661
2662         from_nodeid = ms->m_header.h_nodeid;
2663
2664         len = receive_extralen(ms);
2665
2666         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2667         if (dir_nodeid != dlm_our_nodeid()) {
2668                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
2669                           dir_nodeid, from_nodeid);
2670                 return;
2671         }
2672
2673         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
2674 }
2675
2676 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2677 {
2678         struct dlm_lkb *lkb;
2679         struct dlm_rsb *r;
2680         int error, mstype;
2681
2682         error = find_lkb(ls, ms->m_remid, &lkb);
2683         if (error) {
2684                 log_error(ls, "receive_request_reply no lkb");
2685                 return;
2686         }
2687         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2688
2689         mstype = lkb->lkb_wait_type;
2690         error = remove_from_waiters(lkb);
2691         if (error) {
2692                 log_error(ls, "receive_request_reply not on waiters");
2693                 goto out;
2694         }
2695
2696         /* this is the value returned from do_request() on the master */
2697         error = ms->m_result;
2698
2699         r = lkb->lkb_resource;
2700         hold_rsb(r);
2701         lock_rsb(r);
2702
2703         /* Optimization: the dir node was also the master, so it took our
2704            lookup as a request and sent request reply instead of lookup reply */
2705         if (mstype == DLM_MSG_LOOKUP) {
2706                 r->res_nodeid = ms->m_header.h_nodeid;
2707                 lkb->lkb_nodeid = r->res_nodeid;
2708         }
2709
2710         switch (error) {
2711         case -EAGAIN:
2712                 /* request would block (be queued) on remote master;
2713                    the unhold undoes the original ref from create_lkb()
2714                    so it leads to the lkb being freed */
2715                 queue_cast(r, lkb, -EAGAIN);
2716                 confirm_master(r, -EAGAIN);
2717                 unhold_lkb(lkb);
2718                 break;
2719
2720         case -EINPROGRESS:
2721         case 0:
2722                 /* request was queued or granted on remote master */
2723                 receive_flags_reply(lkb, ms);
2724                 lkb->lkb_remid = ms->m_lkid;
2725                 if (error)
2726                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
2727                 else {
2728                         grant_lock_pc(r, lkb, ms);
2729                         queue_cast(r, lkb, 0);
2730                 }
2731                 confirm_master(r, error);
2732                 break;
2733
2734         case -ENOENT:
2735         case -ENOTBLK:
2736                 /* find_rsb failed to find rsb or rsb wasn't master */
2737                 r->res_nodeid = -1;
2738                 lkb->lkb_nodeid = -1;
2739                 _request_lock(r, lkb);
2740                 break;
2741
2742         default:
2743                 log_error(ls, "receive_request_reply error %d", error);
2744         }
2745
2746         unlock_rsb(r);
2747         put_rsb(r);
2748  out:
2749         put_lkb(lkb);
2750 }
2751
2752 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2753                                     struct dlm_message *ms)
2754 {
2755         int error = ms->m_result;
2756
2757         /* this is the value returned from do_convert() on the master */
2758
2759         switch (error) {
2760         case -EAGAIN:
2761                 /* convert would block (be queued) on remote master */
2762                 queue_cast(r, lkb, -EAGAIN);
2763                 break;
2764
2765         case -EINPROGRESS:
2766                 /* convert was queued on remote master */
2767                 del_lkb(r, lkb);
2768                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2769                 break;
2770
2771         case 0:
2772                 /* convert was granted on remote master */
2773                 receive_flags_reply(lkb, ms);
2774                 grant_lock_pc(r, lkb, ms);
2775                 queue_cast(r, lkb, 0);
2776                 break;
2777
2778         default:
2779                 log_error(r->res_ls, "receive_convert_reply error %d", error);
2780         }
2781 }
2782
2783 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2784 {
2785         struct dlm_rsb *r = lkb->lkb_resource;
2786
2787         hold_rsb(r);
2788         lock_rsb(r);
2789
2790         __receive_convert_reply(r, lkb, ms);
2791
2792         unlock_rsb(r);
2793         put_rsb(r);
2794 }
2795
2796 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
2797 {
2798         struct dlm_lkb *lkb;
2799         int error;
2800
2801         error = find_lkb(ls, ms->m_remid, &lkb);
2802         if (error) {
2803                 log_error(ls, "receive_convert_reply no lkb");
2804                 return;
2805         }
2806         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2807
2808         error = remove_from_waiters(lkb);
2809         if (error) {
2810                 log_error(ls, "receive_convert_reply not on waiters");
2811                 goto out;
2812         }
2813
2814         _receive_convert_reply(lkb, ms);
2815  out:
2816         put_lkb(lkb);
2817 }
2818
2819 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2820 {
2821         struct dlm_rsb *r = lkb->lkb_resource;
2822         int error = ms->m_result;
2823
2824         hold_rsb(r);
2825         lock_rsb(r);
2826
2827         /* this is the value returned from do_unlock() on the master */
2828
2829         switch (error) {
2830         case -DLM_EUNLOCK:
2831                 receive_flags_reply(lkb, ms);
2832                 remove_lock_pc(r, lkb);
2833                 queue_cast(r, lkb, -DLM_EUNLOCK);
2834                 break;
2835         default:
2836                 log_error(r->res_ls, "receive_unlock_reply error %d", error);
2837         }
2838
2839         unlock_rsb(r);
2840         put_rsb(r);
2841 }
2842
2843 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
2844 {
2845         struct dlm_lkb *lkb;
2846         int error;
2847
2848         error = find_lkb(ls, ms->m_remid, &lkb);
2849         if (error) {
2850                 log_error(ls, "receive_unlock_reply no lkb");
2851                 return;
2852         }
2853         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2854
2855         error = remove_from_waiters(lkb);
2856         if (error) {
2857                 log_error(ls, "receive_unlock_reply not on waiters");
2858                 goto out;
2859         }
2860
2861         _receive_unlock_reply(lkb, ms);
2862  out:
2863         put_lkb(lkb);
2864 }
2865
2866 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2867 {
2868         struct dlm_rsb *r = lkb->lkb_resource;
2869         int error = ms->m_result;
2870
2871         hold_rsb(r);
2872         lock_rsb(r);
2873
2874         /* this is the value returned from do_cancel() on the master */
2875
2876         switch (error) {
2877         case -DLM_ECANCEL:
2878                 receive_flags_reply(lkb, ms);
2879                 revert_lock_pc(r, lkb);
2880                 queue_cast(r, lkb, -DLM_ECANCEL);
2881                 break;
2882         default:
2883                 log_error(r->res_ls, "receive_cancel_reply error %d", error);
2884         }
2885
2886         unlock_rsb(r);
2887         put_rsb(r);
2888 }
2889
2890 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
2891 {
2892         struct dlm_lkb *lkb;
2893         int error;
2894
2895         error = find_lkb(ls, ms->m_remid, &lkb);
2896         if (error) {
2897                 log_error(ls, "receive_cancel_reply no lkb");
2898                 return;
2899         }
2900         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2901
2902         error = remove_from_waiters(lkb);
2903         if (error) {
2904                 log_error(ls, "receive_cancel_reply not on waiters");
2905                 goto out;
2906         }
2907
2908         _receive_cancel_reply(lkb, ms);
2909  out:
2910         put_lkb(lkb);
2911 }
2912
2913 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
2914 {
2915         struct dlm_lkb *lkb;
2916         struct dlm_rsb *r;
2917         int error, ret_nodeid;
2918
2919         error = find_lkb(ls, ms->m_lkid, &lkb);
2920         if (error) {
2921                 log_error(ls, "receive_lookup_reply no lkb");
2922                 return;
2923         }
2924
2925         error = remove_from_waiters(lkb);
2926         if (error) {
2927                 log_error(ls, "receive_lookup_reply not on waiters");
2928                 goto out;
2929         }
2930
2931         /* this is the value returned by dlm_dir_lookup on dir node
2932            FIXME: will a non-zero error ever be returned? */
2933         error = ms->m_result;
2934
2935         r = lkb->lkb_resource;
2936         hold_rsb(r);
2937         lock_rsb(r);
2938
2939         ret_nodeid = ms->m_nodeid;
2940         if (ret_nodeid == dlm_our_nodeid()) {
2941                 r->res_nodeid = 0;
2942                 ret_nodeid = 0;
2943                 r->res_first_lkid = 0;
2944         } else {
2945                 /* set_master() will copy res_nodeid to lkb_nodeid */
2946                 r->res_nodeid = ret_nodeid;
2947         }
2948
2949         _request_lock(r, lkb);
2950
2951         if (!ret_nodeid)
2952                 process_lookup_list(r);
2953
2954         unlock_rsb(r);
2955         put_rsb(r);
2956  out:
2957         put_lkb(lkb);
2958 }
2959
2960 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
2961 {
2962         struct dlm_message *ms = (struct dlm_message *) hd;
2963         struct dlm_ls *ls;
2964         int error;
2965
2966         if (!recovery)
2967                 dlm_message_in(ms);
2968
2969         ls = dlm_find_lockspace_global(hd->h_lockspace);
2970         if (!ls) {
2971                 log_print("drop message %d from %d for unknown lockspace %d",
2972                           ms->m_type, nodeid, hd->h_lockspace);
2973                 return -EINVAL;
2974         }
2975
2976         /* recovery may have just ended leaving a bunch of backed-up requests
2977            in the requestqueue; wait while dlm_recoverd clears them */
2978
2979         if (!recovery)
2980                 dlm_wait_requestqueue(ls);
2981
2982         /* recovery may have just started while there were a bunch of
2983            in-flight requests -- save them in requestqueue to be processed
2984            after recovery.  we can't let dlm_recvd block on the recovery
2985            lock.  if dlm_recoverd is calling this function to clear the
2986            requestqueue, it needs to be interrupted (-EINTR) if another
2987            recovery operation is starting. */
2988
2989         while (1) {
2990                 if (dlm_locking_stopped(ls)) {
2991                         if (!recovery)
2992                                 dlm_add_requestqueue(ls, nodeid, hd);
2993                         error = -EINTR;
2994                         goto out;
2995                 }
2996
2997                 if (lock_recovery_try(ls))
2998                         break;
2999                 schedule();
3000         }
3001
3002         switch (ms->m_type) {
3003
3004         /* messages sent to a master node */
3005
3006         case DLM_MSG_REQUEST:
3007                 receive_request(ls, ms);
3008                 break;
3009
3010         case DLM_MSG_CONVERT:
3011                 receive_convert(ls, ms);
3012                 break;
3013
3014         case DLM_MSG_UNLOCK:
3015                 receive_unlock(ls, ms);
3016                 break;
3017
3018         case DLM_MSG_CANCEL:
3019                 receive_cancel(ls, ms);
3020                 break;
3021
3022         /* messages sent from a master node (replies to above) */
3023
3024         case DLM_MSG_REQUEST_REPLY:
3025                 receive_request_reply(ls, ms);
3026                 break;
3027
3028         case DLM_MSG_CONVERT_REPLY:
3029                 receive_convert_reply(ls, ms);
3030                 break;
3031
3032         case DLM_MSG_UNLOCK_REPLY:
3033                 receive_unlock_reply(ls, ms);
3034                 break;
3035
3036         case DLM_MSG_CANCEL_REPLY:
3037                 receive_cancel_reply(ls, ms);
3038                 break;
3039
3040         /* messages sent from a master node (only two types of async msg) */
3041
3042         case DLM_MSG_GRANT:
3043                 receive_grant(ls, ms);
3044                 break;
3045
3046         case DLM_MSG_BAST:
3047                 receive_bast(ls, ms);
3048                 break;
3049
3050         /* messages sent to a dir node */
3051
3052         case DLM_MSG_LOOKUP:
3053                 receive_lookup(ls, ms);
3054                 break;
3055
3056         case DLM_MSG_REMOVE:
3057                 receive_remove(ls, ms);
3058                 break;
3059
3060         /* messages sent from a dir node (remove has no reply) */
3061
3062         case DLM_MSG_LOOKUP_REPLY:
3063                 receive_lookup_reply(ls, ms);
3064                 break;
3065
3066         default:
3067                 log_error(ls, "unknown message type %d", ms->m_type);
3068         }
3069
3070         unlock_recovery(ls);
3071  out:
3072         dlm_put_lockspace(ls);
3073         dlm_astd_wake();
3074         return 0;
3075 }
3076
3077
3078 /*
3079  * Recovery related
3080  */
3081
3082 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3083 {
3084         if (middle_conversion(lkb)) {
3085                 hold_lkb(lkb);
3086                 ls->ls_stub_ms.m_result = -EINPROGRESS;
3087                 _remove_from_waiters(lkb);
3088                 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3089
3090                 /* Same special case as in receive_rcom_lock_args() */
3091                 lkb->lkb_grmode = DLM_LOCK_IV;
3092                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3093                 unhold_lkb(lkb);
3094
3095         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3096                 lkb->lkb_flags |= DLM_IFL_RESEND;
3097         }
3098
3099         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3100            conversions are async; there's no reply from the remote master */
3101 }
3102
3103 /* A waiting lkb needs recovery if the master node has failed, or
3104    the master node is changing (only when no directory is used) */
3105
3106 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3107 {
3108         if (dlm_is_removed(ls, lkb->lkb_nodeid))
3109                 return 1;
3110
3111         if (!dlm_no_directory(ls))
3112                 return 0;
3113
3114         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3115                 return 1;
3116
3117         return 0;
3118 }
3119
3120 /* Recovery for locks that are waiting for replies from nodes that are now
3121    gone.  We can just complete unlocks and cancels by faking a reply from the
3122    dead node.  Requests and up-conversions we flag to be resent after
3123    recovery.  Down-conversions can just be completed with a fake reply like
3124    unlocks.  Conversions between PR and CW need special attention. */
3125
3126 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3127 {
3128         struct dlm_lkb *lkb, *safe;
3129
3130         mutex_lock(&ls->ls_waiters_mutex);
3131
3132         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3133                 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3134                           lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3135
3136                 /* all outstanding lookups, regardless of destination  will be
3137                    resent after recovery is done */
3138
3139                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3140                         lkb->lkb_flags |= DLM_IFL_RESEND;
3141                         continue;
3142                 }
3143
3144                 if (!waiter_needs_recovery(ls, lkb))
3145                         continue;
3146
3147                 switch (lkb->lkb_wait_type) {
3148
3149                 case DLM_MSG_REQUEST:
3150                         lkb->lkb_flags |= DLM_IFL_RESEND;
3151                         break;
3152
3153                 case DLM_MSG_CONVERT:
3154                         recover_convert_waiter(ls, lkb);
3155                         break;
3156
3157                 case DLM_MSG_UNLOCK:
3158                         hold_lkb(lkb);
3159                         ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3160                         _remove_from_waiters(lkb);
3161                         _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3162                         put_lkb(lkb);
3163                         break;
3164
3165                 case DLM_MSG_CANCEL:
3166                         hold_lkb(lkb);
3167                         ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3168                         _remove_from_waiters(lkb);
3169                         _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3170                         put_lkb(lkb);
3171                         break;
3172
3173                 default:
3174                         log_error(ls, "invalid lkb wait_type %d",
3175                                   lkb->lkb_wait_type);
3176                 }
3177         }
3178         mutex_unlock(&ls->ls_waiters_mutex);
3179 }
3180
3181 static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
3182 {
3183         struct dlm_lkb *lkb;
3184         int rv = 0;
3185
3186         mutex_lock(&ls->ls_waiters_mutex);
3187         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3188                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3189                         rv = lkb->lkb_wait_type;
3190                         _remove_from_waiters(lkb);
3191                         lkb->lkb_flags &= ~DLM_IFL_RESEND;
3192                         break;
3193                 }
3194         }
3195         mutex_unlock(&ls->ls_waiters_mutex);
3196
3197         if (!rv)
3198                 lkb = NULL;
3199         *lkb_ret = lkb;
3200         return rv;
3201 }
3202
3203 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
3204    master or dir-node for r.  Processing the lkb may result in it being placed
3205    back on waiters. */
3206
3207 int dlm_recover_waiters_post(struct dlm_ls *ls)
3208 {
3209         struct dlm_lkb *lkb;
3210         struct dlm_rsb *r;
3211         int error = 0, mstype;
3212
3213         while (1) {
3214                 if (dlm_locking_stopped(ls)) {
3215                         log_debug(ls, "recover_waiters_post aborted");
3216                         error = -EINTR;
3217                         break;
3218                 }
3219
3220                 mstype = remove_resend_waiter(ls, &lkb);
3221                 if (!mstype)
3222                         break;
3223
3224                 r = lkb->lkb_resource;
3225
3226                 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3227                           lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3228
3229                 switch (mstype) {
3230
3231                 case DLM_MSG_LOOKUP:
3232                         hold_rsb(r);
3233                         lock_rsb(r);
3234                         _request_lock(r, lkb);
3235                         if (is_master(r))
3236                                 confirm_master(r, 0);
3237                         unlock_rsb(r);
3238                         put_rsb(r);
3239                         break;
3240
3241                 case DLM_MSG_REQUEST:
3242                         hold_rsb(r);
3243                         lock_rsb(r);
3244                         _request_lock(r, lkb);
3245                         unlock_rsb(r);
3246                         put_rsb(r);
3247                         break;
3248
3249                 case DLM_MSG_CONVERT:
3250                         hold_rsb(r);
3251                         lock_rsb(r);
3252                         _convert_lock(r, lkb);
3253                         unlock_rsb(r);
3254                         put_rsb(r);
3255                         break;
3256
3257                 default:
3258                         log_error(ls, "recover_waiters_post type %d", mstype);
3259                 }
3260         }
3261
3262         return error;
3263 }
3264
3265 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3266                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3267 {
3268         struct dlm_ls *ls = r->res_ls;
3269         struct dlm_lkb *lkb, *safe;
3270
3271         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3272                 if (test(ls, lkb)) {
3273                         del_lkb(r, lkb);
3274                         /* this put should free the lkb */
3275                         if (!put_lkb(lkb))
3276                                 log_error(ls, "purged lkb not released");
3277                 }
3278         }
3279 }
3280
3281 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3282 {
3283         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3284 }
3285
3286 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3287 {
3288         return is_master_copy(lkb);
3289 }
3290
3291 static void purge_dead_locks(struct dlm_rsb *r)
3292 {
3293         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3294         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3295         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3296 }
3297
3298 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3299 {
3300         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3301         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3302         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3303 }
3304
3305 /* Get rid of locks held by nodes that are gone. */
3306
3307 int dlm_purge_locks(struct dlm_ls *ls)
3308 {
3309         struct dlm_rsb *r;
3310
3311         log_debug(ls, "dlm_purge_locks");
3312
3313         down_write(&ls->ls_root_sem);
3314         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3315                 hold_rsb(r);
3316                 lock_rsb(r);
3317                 if (is_master(r))
3318                         purge_dead_locks(r);
3319                 unlock_rsb(r);
3320                 unhold_rsb(r);
3321
3322                 schedule();
3323         }
3324         up_write(&ls->ls_root_sem);
3325
3326         return 0;
3327 }
3328
3329 int dlm_grant_after_purge(struct dlm_ls *ls)
3330 {
3331         struct dlm_rsb *r;
3332         int i;
3333
3334         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
3335                 read_lock(&ls->ls_rsbtbl[i].lock);
3336                 list_for_each_entry(r, &ls->ls_rsbtbl[i].list, res_hashchain) {
3337                         hold_rsb(r);
3338                         lock_rsb(r);
3339                         if (is_master(r)) {
3340                                 grant_pending_locks(r);
3341                                 confirm_master(r, 0);
3342                         }
3343                         unlock_rsb(r);
3344                         put_rsb(r);
3345                 }
3346                 read_unlock(&ls->ls_rsbtbl[i].lock);
3347         }
3348
3349         return 0;
3350 }
3351
3352 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3353                                          uint32_t remid)
3354 {
3355         struct dlm_lkb *lkb;
3356
3357         list_for_each_entry(lkb, head, lkb_statequeue) {
3358                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3359                         return lkb;
3360         }
3361         return NULL;
3362 }
3363
3364 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3365                                     uint32_t remid)
3366 {
3367         struct dlm_lkb *lkb;
3368
3369         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3370         if (lkb)
3371                 return lkb;
3372         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3373         if (lkb)
3374                 return lkb;
3375         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3376         if (lkb)
3377                 return lkb;
3378         return NULL;
3379 }
3380
3381 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3382                                   struct dlm_rsb *r, struct dlm_rcom *rc)
3383 {
3384         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3385         int lvblen;
3386
3387         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3388         lkb->lkb_ownpid = rl->rl_ownpid;
3389         lkb->lkb_remid = rl->rl_lkid;
3390         lkb->lkb_exflags = rl->rl_exflags;
3391         lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3392         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3393         lkb->lkb_lvbseq = rl->rl_lvbseq;
3394         lkb->lkb_rqmode = rl->rl_rqmode;
3395         lkb->lkb_grmode = rl->rl_grmode;
3396         /* don't set lkb_status because add_lkb wants to itself */
3397
3398         lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3399         lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3400
3401         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3402                 lkb->lkb_lvbptr = allocate_lvb(ls);
3403                 if (!lkb->lkb_lvbptr)
3404                         return -ENOMEM;
3405                 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3406                          sizeof(struct rcom_lock);
3407                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3408         }
3409
3410         /* Conversions between PR and CW (middle modes) need special handling.
3411            The real granted mode of these converting locks cannot be determined
3412            until all locks have been rebuilt on the rsb (recover_conversion) */
3413
3414         if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3415                 rl->rl_status = DLM_LKSTS_CONVERT;
3416                 lkb->lkb_grmode = DLM_LOCK_IV;
3417                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3418         }
3419
3420         return 0;
3421 }
3422
3423 /* This lkb may have been recovered in a previous aborted recovery so we need
3424    to check if the rsb already has an lkb with the given remote nodeid/lkid.
3425    If so we just send back a standard reply.  If not, we create a new lkb with
3426    the given values and send back our lkid.  We send back our lkid by sending
3427    back the rcom_lock struct we got but with the remid field filled in. */
3428
3429 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3430 {
3431         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3432         struct dlm_rsb *r;
3433         struct dlm_lkb *lkb;
3434         int error;
3435
3436         if (rl->rl_parent_lkid) {
3437                 error = -EOPNOTSUPP;
3438                 goto out;
3439         }
3440
3441         error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3442         if (error)
3443                 goto out;
3444
3445         lock_rsb(r);
3446
3447         lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3448         if (lkb) {
3449                 error = -EEXIST;
3450                 goto out_remid;
3451         }
3452
3453         error = create_lkb(ls, &lkb);
3454         if (error)
3455                 goto out_unlock;
3456
3457         error = receive_rcom_lock_args(ls, lkb, r, rc);
3458         if (error) {
3459                 put_lkb(lkb);
3460                 goto out_unlock;
3461         }
3462
3463         attach_lkb(r, lkb);
3464         add_lkb(r, lkb, rl->rl_status);
3465         error = 0;
3466
3467  out_remid:
3468         /* this is the new value returned to the lock holder for
3469            saving in its process-copy lkb */
3470         rl->rl_remid = lkb->lkb_id;
3471
3472  out_unlock:
3473         unlock_rsb(r);
3474         put_rsb(r);
3475  out:
3476         if (error)
3477                 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3478         rl->rl_result = error;
3479         return error;
3480 }
3481
3482 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3483 {
3484         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3485         struct dlm_rsb *r;
3486         struct dlm_lkb *lkb;
3487         int error;
3488
3489         error = find_lkb(ls, rl->rl_lkid, &lkb);
3490         if (error) {
3491                 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3492                 return error;
3493         }
3494
3495         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3496
3497         error = rl->rl_result;
3498
3499         r = lkb->lkb_resource;
3500         hold_rsb(r);
3501         lock_rsb(r);
3502
3503         switch (error) {
3504         case -EEXIST:
3505                 log_debug(ls, "master copy exists %x", lkb->lkb_id);
3506                 /* fall through */
3507         case 0:
3508                 lkb->lkb_remid = rl->rl_remid;
3509                 break;
3510         default:
3511                 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
3512                           error, lkb->lkb_id);
3513         }
3514
3515         /* an ack for dlm_recover_locks() which waits for replies from
3516            all the locks it sends to new masters */
3517         dlm_recovered_lock(r);
3518
3519         unlock_rsb(r);
3520         put_rsb(r);
3521         put_lkb(lkb);
3522
3523         return 0;
3524 }
3525