]> err.no Git - linux-2.6/blob - fs/dlm/lock.c
[DLM] [RFC: -mm patch] fs/dlm/lock.c: unexport dlm_lvb_operations
[linux-2.6] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86                                     struct dlm_message *ms);
87 static int receive_extralen(struct dlm_message *ms);
88
89 #define FAKE_USER_AST (void*)0xff00ff00
90
91 /*
92  * Lock compatibilty matrix - thanks Steve
93  * UN = Unlocked state. Not really a state, used as a flag
94  * PD = Padding. Used to make the matrix a nice power of two in size
95  * Other states are the same as the VMS DLM.
96  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
97  */
98
99 static const int __dlm_compat_matrix[8][8] = {
100       /* UN NL CR CW PR PW EX PD */
101         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
102         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
103         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
104         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
105         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
106         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
107         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
108         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
109 };
110
111 /*
112  * This defines the direction of transfer of LVB data.
113  * Granted mode is the row; requested mode is the column.
114  * Usage: matrix[grmode+1][rqmode+1]
115  * 1 = LVB is returned to the caller
116  * 0 = LVB is written to the resource
117  * -1 = nothing happens to the LVB
118  */
119
120 const int dlm_lvb_operations[8][8] = {
121         /* UN   NL  CR  CW  PR  PW  EX  PD*/
122         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
123         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
124         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
125         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
126         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
127         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
128         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
129         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
130 };
131
132 #define modes_compat(gr, rq) \
133         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
134
135 int dlm_modes_compat(int mode1, int mode2)
136 {
137         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
138 }
139
140 /*
141  * Compatibility matrix for conversions with QUECVT set.
142  * Granted mode is the row; requested mode is the column.
143  * Usage: matrix[grmode+1][rqmode+1]
144  */
145
146 static const int __quecvt_compat_matrix[8][8] = {
147       /* UN NL CR CW PR PW EX PD */
148         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
149         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
150         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
151         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
152         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
153         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
154         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
155         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
156 };
157
158 void dlm_print_lkb(struct dlm_lkb *lkb)
159 {
160         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
161                "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
162                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
163                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
164                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
165 }
166
167 void dlm_print_rsb(struct dlm_rsb *r)
168 {
169         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
170                r->res_nodeid, r->res_flags, r->res_first_lkid,
171                r->res_recover_locks_count, r->res_name);
172 }
173
174 /* Threads cannot use the lockspace while it's being recovered */
175
176 static inline void lock_recovery(struct dlm_ls *ls)
177 {
178         down_read(&ls->ls_in_recovery);
179 }
180
181 static inline void unlock_recovery(struct dlm_ls *ls)
182 {
183         up_read(&ls->ls_in_recovery);
184 }
185
186 static inline int lock_recovery_try(struct dlm_ls *ls)
187 {
188         return down_read_trylock(&ls->ls_in_recovery);
189 }
190
191 static inline int can_be_queued(struct dlm_lkb *lkb)
192 {
193         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
194 }
195
196 static inline int force_blocking_asts(struct dlm_lkb *lkb)
197 {
198         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
199 }
200
201 static inline int is_demoted(struct dlm_lkb *lkb)
202 {
203         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
204 }
205
206 static inline int is_remote(struct dlm_rsb *r)
207 {
208         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
209         return !!r->res_nodeid;
210 }
211
212 static inline int is_process_copy(struct dlm_lkb *lkb)
213 {
214         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
215 }
216
217 static inline int is_master_copy(struct dlm_lkb *lkb)
218 {
219         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
220                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
221         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
222 }
223
224 static inline int middle_conversion(struct dlm_lkb *lkb)
225 {
226         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
227             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
228                 return 1;
229         return 0;
230 }
231
232 static inline int down_conversion(struct dlm_lkb *lkb)
233 {
234         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
235 }
236
237 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
238 {
239         if (is_master_copy(lkb))
240                 return;
241
242         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
243
244         lkb->lkb_lksb->sb_status = rv;
245         lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
246
247         dlm_add_ast(lkb, AST_COMP);
248 }
249
250 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
251 {
252         if (is_master_copy(lkb))
253                 send_bast(r, lkb, rqmode);
254         else {
255                 lkb->lkb_bastmode = rqmode;
256                 dlm_add_ast(lkb, AST_BAST);
257         }
258 }
259
260 /*
261  * Basic operations on rsb's and lkb's
262  */
263
264 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
265 {
266         struct dlm_rsb *r;
267
268         r = allocate_rsb(ls, len);
269         if (!r)
270                 return NULL;
271
272         r->res_ls = ls;
273         r->res_length = len;
274         memcpy(r->res_name, name, len);
275         mutex_init(&r->res_mutex);
276
277         INIT_LIST_HEAD(&r->res_lookup);
278         INIT_LIST_HEAD(&r->res_grantqueue);
279         INIT_LIST_HEAD(&r->res_convertqueue);
280         INIT_LIST_HEAD(&r->res_waitqueue);
281         INIT_LIST_HEAD(&r->res_root_list);
282         INIT_LIST_HEAD(&r->res_recover_list);
283
284         return r;
285 }
286
287 static int search_rsb_list(struct list_head *head, char *name, int len,
288                            unsigned int flags, struct dlm_rsb **r_ret)
289 {
290         struct dlm_rsb *r;
291         int error = 0;
292
293         list_for_each_entry(r, head, res_hashchain) {
294                 if (len == r->res_length && !memcmp(name, r->res_name, len))
295                         goto found;
296         }
297         return -EBADR;
298
299  found:
300         if (r->res_nodeid && (flags & R_MASTER))
301                 error = -ENOTBLK;
302         *r_ret = r;
303         return error;
304 }
305
306 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
307                        unsigned int flags, struct dlm_rsb **r_ret)
308 {
309         struct dlm_rsb *r;
310         int error;
311
312         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
313         if (!error) {
314                 kref_get(&r->res_ref);
315                 goto out;
316         }
317         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
318         if (error)
319                 goto out;
320
321         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
322
323         if (dlm_no_directory(ls))
324                 goto out;
325
326         if (r->res_nodeid == -1) {
327                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
328                 r->res_first_lkid = 0;
329         } else if (r->res_nodeid > 0) {
330                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
331                 r->res_first_lkid = 0;
332         } else {
333                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
334                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
335         }
336  out:
337         *r_ret = r;
338         return error;
339 }
340
341 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
342                       unsigned int flags, struct dlm_rsb **r_ret)
343 {
344         int error;
345         write_lock(&ls->ls_rsbtbl[b].lock);
346         error = _search_rsb(ls, name, len, b, flags, r_ret);
347         write_unlock(&ls->ls_rsbtbl[b].lock);
348         return error;
349 }
350
351 /*
352  * Find rsb in rsbtbl and potentially create/add one
353  *
354  * Delaying the release of rsb's has a similar benefit to applications keeping
355  * NL locks on an rsb, but without the guarantee that the cached master value
356  * will still be valid when the rsb is reused.  Apps aren't always smart enough
357  * to keep NL locks on an rsb that they may lock again shortly; this can lead
358  * to excessive master lookups and removals if we don't delay the release.
359  *
360  * Searching for an rsb means looking through both the normal list and toss
361  * list.  When found on the toss list the rsb is moved to the normal list with
362  * ref count of 1; when found on normal list the ref count is incremented.
363  */
364
365 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
366                     unsigned int flags, struct dlm_rsb **r_ret)
367 {
368         struct dlm_rsb *r, *tmp;
369         uint32_t hash, bucket;
370         int error = 0;
371
372         if (dlm_no_directory(ls))
373                 flags |= R_CREATE;
374
375         hash = jhash(name, namelen, 0);
376         bucket = hash & (ls->ls_rsbtbl_size - 1);
377
378         error = search_rsb(ls, name, namelen, bucket, flags, &r);
379         if (!error)
380                 goto out;
381
382         if (error == -EBADR && !(flags & R_CREATE))
383                 goto out;
384
385         /* the rsb was found but wasn't a master copy */
386         if (error == -ENOTBLK)
387                 goto out;
388
389         error = -ENOMEM;
390         r = create_rsb(ls, name, namelen);
391         if (!r)
392                 goto out;
393
394         r->res_hash = hash;
395         r->res_bucket = bucket;
396         r->res_nodeid = -1;
397         kref_init(&r->res_ref);
398
399         /* With no directory, the master can be set immediately */
400         if (dlm_no_directory(ls)) {
401                 int nodeid = dlm_dir_nodeid(r);
402                 if (nodeid == dlm_our_nodeid())
403                         nodeid = 0;
404                 r->res_nodeid = nodeid;
405         }
406
407         write_lock(&ls->ls_rsbtbl[bucket].lock);
408         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
409         if (!error) {
410                 write_unlock(&ls->ls_rsbtbl[bucket].lock);
411                 free_rsb(r);
412                 r = tmp;
413                 goto out;
414         }
415         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
416         write_unlock(&ls->ls_rsbtbl[bucket].lock);
417         error = 0;
418  out:
419         *r_ret = r;
420         return error;
421 }
422
423 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
424                  unsigned int flags, struct dlm_rsb **r_ret)
425 {
426         return find_rsb(ls, name, namelen, flags, r_ret);
427 }
428
429 /* This is only called to add a reference when the code already holds
430    a valid reference to the rsb, so there's no need for locking. */
431
432 static inline void hold_rsb(struct dlm_rsb *r)
433 {
434         kref_get(&r->res_ref);
435 }
436
437 void dlm_hold_rsb(struct dlm_rsb *r)
438 {
439         hold_rsb(r);
440 }
441
442 static void toss_rsb(struct kref *kref)
443 {
444         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
445         struct dlm_ls *ls = r->res_ls;
446
447         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
448         kref_init(&r->res_ref);
449         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
450         r->res_toss_time = jiffies;
451         if (r->res_lvbptr) {
452                 free_lvb(r->res_lvbptr);
453                 r->res_lvbptr = NULL;
454         }
455 }
456
457 /* When all references to the rsb are gone it's transfered to
458    the tossed list for later disposal. */
459
460 static void put_rsb(struct dlm_rsb *r)
461 {
462         struct dlm_ls *ls = r->res_ls;
463         uint32_t bucket = r->res_bucket;
464
465         write_lock(&ls->ls_rsbtbl[bucket].lock);
466         kref_put(&r->res_ref, toss_rsb);
467         write_unlock(&ls->ls_rsbtbl[bucket].lock);
468 }
469
470 void dlm_put_rsb(struct dlm_rsb *r)
471 {
472         put_rsb(r);
473 }
474
475 /* See comment for unhold_lkb */
476
477 static void unhold_rsb(struct dlm_rsb *r)
478 {
479         int rv;
480         rv = kref_put(&r->res_ref, toss_rsb);
481         DLM_ASSERT(!rv, dlm_print_rsb(r););
482 }
483
484 static void kill_rsb(struct kref *kref)
485 {
486         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
487
488         /* All work is done after the return from kref_put() so we
489            can release the write_lock before the remove and free. */
490
491         DLM_ASSERT(list_empty(&r->res_lookup),);
492         DLM_ASSERT(list_empty(&r->res_grantqueue),);
493         DLM_ASSERT(list_empty(&r->res_convertqueue),);
494         DLM_ASSERT(list_empty(&r->res_waitqueue),);
495         DLM_ASSERT(list_empty(&r->res_root_list),);
496         DLM_ASSERT(list_empty(&r->res_recover_list),);
497 }
498
499 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
500    The rsb must exist as long as any lkb's for it do. */
501
502 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
503 {
504         hold_rsb(r);
505         lkb->lkb_resource = r;
506 }
507
508 static void detach_lkb(struct dlm_lkb *lkb)
509 {
510         if (lkb->lkb_resource) {
511                 put_rsb(lkb->lkb_resource);
512                 lkb->lkb_resource = NULL;
513         }
514 }
515
516 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
517 {
518         struct dlm_lkb *lkb, *tmp;
519         uint32_t lkid = 0;
520         uint16_t bucket;
521
522         lkb = allocate_lkb(ls);
523         if (!lkb)
524                 return -ENOMEM;
525
526         lkb->lkb_nodeid = -1;
527         lkb->lkb_grmode = DLM_LOCK_IV;
528         kref_init(&lkb->lkb_ref);
529
530         get_random_bytes(&bucket, sizeof(bucket));
531         bucket &= (ls->ls_lkbtbl_size - 1);
532
533         write_lock(&ls->ls_lkbtbl[bucket].lock);
534
535         /* counter can roll over so we must verify lkid is not in use */
536
537         while (lkid == 0) {
538                 lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16);
539
540                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
541                                     lkb_idtbl_list) {
542                         if (tmp->lkb_id != lkid)
543                                 continue;
544                         lkid = 0;
545                         break;
546                 }
547         }
548
549         lkb->lkb_id = lkid;
550         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
551         write_unlock(&ls->ls_lkbtbl[bucket].lock);
552
553         *lkb_ret = lkb;
554         return 0;
555 }
556
557 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
558 {
559         uint16_t bucket = lkid & 0xFFFF;
560         struct dlm_lkb *lkb;
561
562         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
563                 if (lkb->lkb_id == lkid)
564                         return lkb;
565         }
566         return NULL;
567 }
568
569 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
570 {
571         struct dlm_lkb *lkb;
572         uint16_t bucket = lkid & 0xFFFF;
573
574         if (bucket >= ls->ls_lkbtbl_size)
575                 return -EBADSLT;
576
577         read_lock(&ls->ls_lkbtbl[bucket].lock);
578         lkb = __find_lkb(ls, lkid);
579         if (lkb)
580                 kref_get(&lkb->lkb_ref);
581         read_unlock(&ls->ls_lkbtbl[bucket].lock);
582
583         *lkb_ret = lkb;
584         return lkb ? 0 : -ENOENT;
585 }
586
587 static void kill_lkb(struct kref *kref)
588 {
589         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
590
591         /* All work is done after the return from kref_put() so we
592            can release the write_lock before the detach_lkb */
593
594         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
595 }
596
597 /* __put_lkb() is used when an lkb may not have an rsb attached to
598    it so we need to provide the lockspace explicitly */
599
600 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
601 {
602         uint16_t bucket = lkb->lkb_id & 0xFFFF;
603
604         write_lock(&ls->ls_lkbtbl[bucket].lock);
605         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
606                 list_del(&lkb->lkb_idtbl_list);
607                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
608
609                 detach_lkb(lkb);
610
611                 /* for local/process lkbs, lvbptr points to caller's lksb */
612                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
613                         free_lvb(lkb->lkb_lvbptr);
614                 free_lkb(lkb);
615                 return 1;
616         } else {
617                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
618                 return 0;
619         }
620 }
621
622 int dlm_put_lkb(struct dlm_lkb *lkb)
623 {
624         struct dlm_ls *ls;
625
626         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
627         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
628
629         ls = lkb->lkb_resource->res_ls;
630         return __put_lkb(ls, lkb);
631 }
632
633 /* This is only called to add a reference when the code already holds
634    a valid reference to the lkb, so there's no need for locking. */
635
636 static inline void hold_lkb(struct dlm_lkb *lkb)
637 {
638         kref_get(&lkb->lkb_ref);
639 }
640
641 /* This is called when we need to remove a reference and are certain
642    it's not the last ref.  e.g. del_lkb is always called between a
643    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
644    put_lkb would work fine, but would involve unnecessary locking */
645
646 static inline void unhold_lkb(struct dlm_lkb *lkb)
647 {
648         int rv;
649         rv = kref_put(&lkb->lkb_ref, kill_lkb);
650         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
651 }
652
653 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
654                             int mode)
655 {
656         struct dlm_lkb *lkb = NULL;
657
658         list_for_each_entry(lkb, head, lkb_statequeue)
659                 if (lkb->lkb_rqmode < mode)
660                         break;
661
662         if (!lkb)
663                 list_add_tail(new, head);
664         else
665                 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
666 }
667
668 /* add/remove lkb to rsb's grant/convert/wait queue */
669
670 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
671 {
672         kref_get(&lkb->lkb_ref);
673
674         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
675
676         lkb->lkb_status = status;
677
678         switch (status) {
679         case DLM_LKSTS_WAITING:
680                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
681                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
682                 else
683                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
684                 break;
685         case DLM_LKSTS_GRANTED:
686                 /* convention says granted locks kept in order of grmode */
687                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
688                                 lkb->lkb_grmode);
689                 break;
690         case DLM_LKSTS_CONVERT:
691                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
692                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
693                 else
694                         list_add_tail(&lkb->lkb_statequeue,
695                                       &r->res_convertqueue);
696                 break;
697         default:
698                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
699         }
700 }
701
702 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
703 {
704         lkb->lkb_status = 0;
705         list_del(&lkb->lkb_statequeue);
706         unhold_lkb(lkb);
707 }
708
709 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
710 {
711         hold_lkb(lkb);
712         del_lkb(r, lkb);
713         add_lkb(r, lkb, sts);
714         unhold_lkb(lkb);
715 }
716
717 /* add/remove lkb from global waiters list of lkb's waiting for
718    a reply from a remote node */
719
720 static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
721 {
722         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
723
724         mutex_lock(&ls->ls_waiters_mutex);
725         if (lkb->lkb_wait_type) {
726                 log_print("add_to_waiters error %d", lkb->lkb_wait_type);
727                 goto out;
728         }
729         lkb->lkb_wait_type = mstype;
730         kref_get(&lkb->lkb_ref);
731         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
732  out:
733         mutex_unlock(&ls->ls_waiters_mutex);
734 }
735
736 static int _remove_from_waiters(struct dlm_lkb *lkb)
737 {
738         int error = 0;
739
740         if (!lkb->lkb_wait_type) {
741                 log_print("remove_from_waiters error");
742                 error = -EINVAL;
743                 goto out;
744         }
745         lkb->lkb_wait_type = 0;
746         list_del(&lkb->lkb_wait_reply);
747         unhold_lkb(lkb);
748  out:
749         return error;
750 }
751
752 static int remove_from_waiters(struct dlm_lkb *lkb)
753 {
754         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
755         int error;
756
757         mutex_lock(&ls->ls_waiters_mutex);
758         error = _remove_from_waiters(lkb);
759         mutex_unlock(&ls->ls_waiters_mutex);
760         return error;
761 }
762
763 static void dir_remove(struct dlm_rsb *r)
764 {
765         int to_nodeid;
766
767         if (dlm_no_directory(r->res_ls))
768                 return;
769
770         to_nodeid = dlm_dir_nodeid(r);
771         if (to_nodeid != dlm_our_nodeid())
772                 send_remove(r);
773         else
774                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
775                                      r->res_name, r->res_length);
776 }
777
778 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
779    found since they are in order of newest to oldest? */
780
781 static int shrink_bucket(struct dlm_ls *ls, int b)
782 {
783         struct dlm_rsb *r;
784         int count = 0, found;
785
786         for (;;) {
787                 found = 0;
788                 write_lock(&ls->ls_rsbtbl[b].lock);
789                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
790                                             res_hashchain) {
791                         if (!time_after_eq(jiffies, r->res_toss_time +
792                                            dlm_config.toss_secs * HZ))
793                                 continue;
794                         found = 1;
795                         break;
796                 }
797
798                 if (!found) {
799                         write_unlock(&ls->ls_rsbtbl[b].lock);
800                         break;
801                 }
802
803                 if (kref_put(&r->res_ref, kill_rsb)) {
804                         list_del(&r->res_hashchain);
805                         write_unlock(&ls->ls_rsbtbl[b].lock);
806
807                         if (is_master(r))
808                                 dir_remove(r);
809                         free_rsb(r);
810                         count++;
811                 } else {
812                         write_unlock(&ls->ls_rsbtbl[b].lock);
813                         log_error(ls, "tossed rsb in use %s", r->res_name);
814                 }
815         }
816
817         return count;
818 }
819
820 void dlm_scan_rsbs(struct dlm_ls *ls)
821 {
822         int i;
823
824         if (dlm_locking_stopped(ls))
825                 return;
826
827         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
828                 shrink_bucket(ls, i);
829                 cond_resched();
830         }
831 }
832
833 /* lkb is master or local copy */
834
835 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
836 {
837         int b, len = r->res_ls->ls_lvblen;
838
839         /* b=1 lvb returned to caller
840            b=0 lvb written to rsb or invalidated
841            b=-1 do nothing */
842
843         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
844
845         if (b == 1) {
846                 if (!lkb->lkb_lvbptr)
847                         return;
848
849                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
850                         return;
851
852                 if (!r->res_lvbptr)
853                         return;
854
855                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
856                 lkb->lkb_lvbseq = r->res_lvbseq;
857
858         } else if (b == 0) {
859                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
860                         rsb_set_flag(r, RSB_VALNOTVALID);
861                         return;
862                 }
863
864                 if (!lkb->lkb_lvbptr)
865                         return;
866
867                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
868                         return;
869
870                 if (!r->res_lvbptr)
871                         r->res_lvbptr = allocate_lvb(r->res_ls);
872
873                 if (!r->res_lvbptr)
874                         return;
875
876                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
877                 r->res_lvbseq++;
878                 lkb->lkb_lvbseq = r->res_lvbseq;
879                 rsb_clear_flag(r, RSB_VALNOTVALID);
880         }
881
882         if (rsb_flag(r, RSB_VALNOTVALID))
883                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
884 }
885
886 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
887 {
888         if (lkb->lkb_grmode < DLM_LOCK_PW)
889                 return;
890
891         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
892                 rsb_set_flag(r, RSB_VALNOTVALID);
893                 return;
894         }
895
896         if (!lkb->lkb_lvbptr)
897                 return;
898
899         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
900                 return;
901
902         if (!r->res_lvbptr)
903                 r->res_lvbptr = allocate_lvb(r->res_ls);
904
905         if (!r->res_lvbptr)
906                 return;
907
908         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
909         r->res_lvbseq++;
910         rsb_clear_flag(r, RSB_VALNOTVALID);
911 }
912
913 /* lkb is process copy (pc) */
914
915 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
916                             struct dlm_message *ms)
917 {
918         int b;
919
920         if (!lkb->lkb_lvbptr)
921                 return;
922
923         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
924                 return;
925
926         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
927         if (b == 1) {
928                 int len = receive_extralen(ms);
929                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
930                 lkb->lkb_lvbseq = ms->m_lvbseq;
931         }
932 }
933
934 /* Manipulate lkb's on rsb's convert/granted/waiting queues
935    remove_lock -- used for unlock, removes lkb from granted
936    revert_lock -- used for cancel, moves lkb from convert to granted
937    grant_lock  -- used for request and convert, adds lkb to granted or
938                   moves lkb from convert or waiting to granted
939
940    Each of these is used for master or local copy lkb's.  There is
941    also a _pc() variation used to make the corresponding change on
942    a process copy (pc) lkb. */
943
944 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
945 {
946         del_lkb(r, lkb);
947         lkb->lkb_grmode = DLM_LOCK_IV;
948         /* this unhold undoes the original ref from create_lkb()
949            so this leads to the lkb being freed */
950         unhold_lkb(lkb);
951 }
952
953 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
954 {
955         set_lvb_unlock(r, lkb);
956         _remove_lock(r, lkb);
957 }
958
959 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
960 {
961         _remove_lock(r, lkb);
962 }
963
964 static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
965 {
966         lkb->lkb_rqmode = DLM_LOCK_IV;
967
968         switch (lkb->lkb_status) {
969         case DLM_LKSTS_GRANTED:
970                 break;
971         case DLM_LKSTS_CONVERT:
972                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
973                 break;
974         case DLM_LKSTS_WAITING:
975                 del_lkb(r, lkb);
976                 lkb->lkb_grmode = DLM_LOCK_IV;
977                 /* this unhold undoes the original ref from create_lkb()
978                    so this leads to the lkb being freed */
979                 unhold_lkb(lkb);
980                 break;
981         default:
982                 log_print("invalid status for revert %d", lkb->lkb_status);
983         }
984 }
985
986 static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
987 {
988         revert_lock(r, lkb);
989 }
990
991 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
992 {
993         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
994                 lkb->lkb_grmode = lkb->lkb_rqmode;
995                 if (lkb->lkb_status)
996                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
997                 else
998                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
999         }
1000
1001         lkb->lkb_rqmode = DLM_LOCK_IV;
1002 }
1003
1004 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1005 {
1006         set_lvb_lock(r, lkb);
1007         _grant_lock(r, lkb);
1008         lkb->lkb_highbast = 0;
1009 }
1010
1011 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1012                           struct dlm_message *ms)
1013 {
1014         set_lvb_lock_pc(r, lkb, ms);
1015         _grant_lock(r, lkb);
1016 }
1017
1018 /* called by grant_pending_locks() which means an async grant message must
1019    be sent to the requesting node in addition to granting the lock if the
1020    lkb belongs to a remote node. */
1021
1022 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1023 {
1024         grant_lock(r, lkb);
1025         if (is_master_copy(lkb))
1026                 send_grant(r, lkb);
1027         else
1028                 queue_cast(r, lkb, 0);
1029 }
1030
1031 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1032 {
1033         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1034                                            lkb_statequeue);
1035         if (lkb->lkb_id == first->lkb_id)
1036                 return 1;
1037
1038         return 0;
1039 }
1040
1041 /* Check if the given lkb conflicts with another lkb on the queue. */
1042
1043 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1044 {
1045         struct dlm_lkb *this;
1046
1047         list_for_each_entry(this, head, lkb_statequeue) {
1048                 if (this == lkb)
1049                         continue;
1050                 if (!modes_compat(this, lkb))
1051                         return 1;
1052         }
1053         return 0;
1054 }
1055
1056 /*
1057  * "A conversion deadlock arises with a pair of lock requests in the converting
1058  * queue for one resource.  The granted mode of each lock blocks the requested
1059  * mode of the other lock."
1060  *
1061  * Part 2: if the granted mode of lkb is preventing the first lkb in the
1062  * convert queue from being granted, then demote lkb (set grmode to NL).
1063  * This second form requires that we check for conv-deadlk even when
1064  * now == 0 in _can_be_granted().
1065  *
1066  * Example:
1067  * Granted Queue: empty
1068  * Convert Queue: NL->EX (first lock)
1069  *                PR->EX (second lock)
1070  *
1071  * The first lock can't be granted because of the granted mode of the second
1072  * lock and the second lock can't be granted because it's not first in the
1073  * list.  We demote the granted mode of the second lock (the lkb passed to this
1074  * function).
1075  *
1076  * After the resolution, the "grant pending" function needs to go back and try
1077  * to grant locks on the convert queue again since the first lock can now be
1078  * granted.
1079  */
1080
1081 static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1082 {
1083         struct dlm_lkb *this, *first = NULL, *self = NULL;
1084
1085         list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1086                 if (!first)
1087                         first = this;
1088                 if (this == lkb) {
1089                         self = lkb;
1090                         continue;
1091                 }
1092
1093                 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
1094                         return 1;
1095         }
1096
1097         /* if lkb is on the convert queue and is preventing the first
1098            from being granted, then there's deadlock and we demote lkb.
1099            multiple converting locks may need to do this before the first
1100            converting lock can be granted. */
1101
1102         if (self && self != first) {
1103                 if (!modes_compat(lkb, first) &&
1104                     !queue_conflict(&rsb->res_grantqueue, first))
1105                         return 1;
1106         }
1107
1108         return 0;
1109 }
1110
1111 /*
1112  * Return 1 if the lock can be granted, 0 otherwise.
1113  * Also detect and resolve conversion deadlocks.
1114  *
1115  * lkb is the lock to be granted
1116  *
1117  * now is 1 if the function is being called in the context of the
1118  * immediate request, it is 0 if called later, after the lock has been
1119  * queued.
1120  *
1121  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1122  */
1123
1124 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1125 {
1126         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1127
1128         /*
1129          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1130          * a new request for a NL mode lock being blocked.
1131          *
1132          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1133          * request, then it would be granted.  In essence, the use of this flag
1134          * tells the Lock Manager to expedite theis request by not considering
1135          * what may be in the CONVERTING or WAITING queues...  As of this
1136          * writing, the EXPEDITE flag can be used only with new requests for NL
1137          * mode locks.  This flag is not valid for conversion requests.
1138          *
1139          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1140          * conversion or used with a non-NL requested mode.  We also know an
1141          * EXPEDITE request is always granted immediately, so now must always
1142          * be 1.  The full condition to grant an expedite request: (now &&
1143          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1144          * therefore be shortened to just checking the flag.
1145          */
1146
1147         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1148                 return 1;
1149
1150         /*
1151          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1152          * added to the remaining conditions.
1153          */
1154
1155         if (queue_conflict(&r->res_grantqueue, lkb))
1156                 goto out;
1157
1158         /*
1159          * 6-3: By default, a conversion request is immediately granted if the
1160          * requested mode is compatible with the modes of all other granted
1161          * locks
1162          */
1163
1164         if (queue_conflict(&r->res_convertqueue, lkb))
1165                 goto out;
1166
1167         /*
1168          * 6-5: But the default algorithm for deciding whether to grant or
1169          * queue conversion requests does not by itself guarantee that such
1170          * requests are serviced on a "first come first serve" basis.  This, in
1171          * turn, can lead to a phenomenon known as "indefinate postponement".
1172          *
1173          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1174          * the system service employed to request a lock conversion.  This flag
1175          * forces certain conversion requests to be queued, even if they are
1176          * compatible with the granted modes of other locks on the same
1177          * resource.  Thus, the use of this flag results in conversion requests
1178          * being ordered on a "first come first servce" basis.
1179          *
1180          * DCT: This condition is all about new conversions being able to occur
1181          * "in place" while the lock remains on the granted queue (assuming
1182          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1183          * doesn't _have_ to go onto the convert queue where it's processed in
1184          * order.  The "now" variable is necessary to distinguish converts
1185          * being received and processed for the first time now, because once a
1186          * convert is moved to the conversion queue the condition below applies
1187          * requiring fifo granting.
1188          */
1189
1190         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1191                 return 1;
1192
1193         /*
1194          * The NOORDER flag is set to avoid the standard vms rules on grant
1195          * order.
1196          */
1197
1198         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1199                 return 1;
1200
1201         /*
1202          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1203          * granted until all other conversion requests ahead of it are granted
1204          * and/or canceled.
1205          */
1206
1207         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1208                 return 1;
1209
1210         /*
1211          * 6-4: By default, a new request is immediately granted only if all
1212          * three of the following conditions are satisfied when the request is
1213          * issued:
1214          * - The queue of ungranted conversion requests for the resource is
1215          *   empty.
1216          * - The queue of ungranted new requests for the resource is empty.
1217          * - The mode of the new request is compatible with the most
1218          *   restrictive mode of all granted locks on the resource.
1219          */
1220
1221         if (now && !conv && list_empty(&r->res_convertqueue) &&
1222             list_empty(&r->res_waitqueue))
1223                 return 1;
1224
1225         /*
1226          * 6-4: Once a lock request is in the queue of ungranted new requests,
1227          * it cannot be granted until the queue of ungranted conversion
1228          * requests is empty, all ungranted new requests ahead of it are
1229          * granted and/or canceled, and it is compatible with the granted mode
1230          * of the most restrictive lock granted on the resource.
1231          */
1232
1233         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1234             first_in_list(lkb, &r->res_waitqueue))
1235                 return 1;
1236
1237  out:
1238         /*
1239          * The following, enabled by CONVDEADLK, departs from VMS.
1240          */
1241
1242         if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1243             conversion_deadlock_detect(r, lkb)) {
1244                 lkb->lkb_grmode = DLM_LOCK_NL;
1245                 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1246         }
1247
1248         return 0;
1249 }
1250
1251 /*
1252  * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1253  * simple way to provide a big optimization to applications that can use them.
1254  */
1255
1256 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1257 {
1258         uint32_t flags = lkb->lkb_exflags;
1259         int rv;
1260         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1261
1262         rv = _can_be_granted(r, lkb, now);
1263         if (rv)
1264                 goto out;
1265
1266         if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1267                 goto out;
1268
1269         if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1270                 alt = DLM_LOCK_PR;
1271         else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1272                 alt = DLM_LOCK_CW;
1273
1274         if (alt) {
1275                 lkb->lkb_rqmode = alt;
1276                 rv = _can_be_granted(r, lkb, now);
1277                 if (rv)
1278                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1279                 else
1280                         lkb->lkb_rqmode = rqmode;
1281         }
1282  out:
1283         return rv;
1284 }
1285
1286 static int grant_pending_convert(struct dlm_rsb *r, int high)
1287 {
1288         struct dlm_lkb *lkb, *s;
1289         int hi, demoted, quit, grant_restart, demote_restart;
1290
1291         quit = 0;
1292  restart:
1293         grant_restart = 0;
1294         demote_restart = 0;
1295         hi = DLM_LOCK_IV;
1296
1297         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1298                 demoted = is_demoted(lkb);
1299                 if (can_be_granted(r, lkb, 0)) {
1300                         grant_lock_pending(r, lkb);
1301                         grant_restart = 1;
1302                 } else {
1303                         hi = max_t(int, lkb->lkb_rqmode, hi);
1304                         if (!demoted && is_demoted(lkb))
1305                                 demote_restart = 1;
1306                 }
1307         }
1308
1309         if (grant_restart)
1310                 goto restart;
1311         if (demote_restart && !quit) {
1312                 quit = 1;
1313                 goto restart;
1314         }
1315
1316         return max_t(int, high, hi);
1317 }
1318
1319 static int grant_pending_wait(struct dlm_rsb *r, int high)
1320 {
1321         struct dlm_lkb *lkb, *s;
1322
1323         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1324                 if (can_be_granted(r, lkb, 0))
1325                         grant_lock_pending(r, lkb);
1326                 else
1327                         high = max_t(int, lkb->lkb_rqmode, high);
1328         }
1329
1330         return high;
1331 }
1332
1333 static void grant_pending_locks(struct dlm_rsb *r)
1334 {
1335         struct dlm_lkb *lkb, *s;
1336         int high = DLM_LOCK_IV;
1337
1338         DLM_ASSERT(is_master(r), dlm_print_rsb(r););
1339
1340         high = grant_pending_convert(r, high);
1341         high = grant_pending_wait(r, high);
1342
1343         if (high == DLM_LOCK_IV)
1344                 return;
1345
1346         /*
1347          * If there are locks left on the wait/convert queue then send blocking
1348          * ASTs to granted locks based on the largest requested mode (high)
1349          * found above. FIXME: highbast < high comparison not valid for PR/CW.
1350          */
1351
1352         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1353                 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1354                     !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1355                         queue_bast(r, lkb, high);
1356                         lkb->lkb_highbast = high;
1357                 }
1358         }
1359 }
1360
1361 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1362                             struct dlm_lkb *lkb)
1363 {
1364         struct dlm_lkb *gr;
1365
1366         list_for_each_entry(gr, head, lkb_statequeue) {
1367                 if (gr->lkb_bastaddr &&
1368                     gr->lkb_highbast < lkb->lkb_rqmode &&
1369                     !modes_compat(gr, lkb)) {
1370                         queue_bast(r, gr, lkb->lkb_rqmode);
1371                         gr->lkb_highbast = lkb->lkb_rqmode;
1372                 }
1373         }
1374 }
1375
1376 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1377 {
1378         send_bast_queue(r, &r->res_grantqueue, lkb);
1379 }
1380
1381 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1382 {
1383         send_bast_queue(r, &r->res_grantqueue, lkb);
1384         send_bast_queue(r, &r->res_convertqueue, lkb);
1385 }
1386
1387 /* set_master(r, lkb) -- set the master nodeid of a resource
1388
1389    The purpose of this function is to set the nodeid field in the given
1390    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1391    known, it can just be copied to the lkb and the function will return
1392    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1393    before it can be copied to the lkb.
1394
1395    When the rsb nodeid is being looked up remotely, the initial lkb
1396    causing the lookup is kept on the ls_waiters list waiting for the
1397    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1398    on the rsb's res_lookup list until the master is verified.
1399
1400    Return values:
1401    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1402    1: the rsb master is not available and the lkb has been placed on
1403       a wait queue
1404 */
1405
1406 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1407 {
1408         struct dlm_ls *ls = r->res_ls;
1409         int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1410
1411         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1412                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1413                 r->res_first_lkid = lkb->lkb_id;
1414                 lkb->lkb_nodeid = r->res_nodeid;
1415                 return 0;
1416         }
1417
1418         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1419                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1420                 return 1;
1421         }
1422
1423         if (r->res_nodeid == 0) {
1424                 lkb->lkb_nodeid = 0;
1425                 return 0;
1426         }
1427
1428         if (r->res_nodeid > 0) {
1429                 lkb->lkb_nodeid = r->res_nodeid;
1430                 return 0;
1431         }
1432
1433         DLM_ASSERT(r->res_nodeid == -1, dlm_print_rsb(r););
1434
1435         dir_nodeid = dlm_dir_nodeid(r);
1436
1437         if (dir_nodeid != our_nodeid) {
1438                 r->res_first_lkid = lkb->lkb_id;
1439                 send_lookup(r, lkb);
1440                 return 1;
1441         }
1442
1443         for (;;) {
1444                 /* It's possible for dlm_scand to remove an old rsb for
1445                    this same resource from the toss list, us to create
1446                    a new one, look up the master locally, and find it
1447                    already exists just before dlm_scand does the
1448                    dir_remove() on the previous rsb. */
1449
1450                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1451                                        r->res_length, &ret_nodeid);
1452                 if (!error)
1453                         break;
1454                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1455                 schedule();
1456         }
1457
1458         if (ret_nodeid == our_nodeid) {
1459                 r->res_first_lkid = 0;
1460                 r->res_nodeid = 0;
1461                 lkb->lkb_nodeid = 0;
1462         } else {
1463                 r->res_first_lkid = lkb->lkb_id;
1464                 r->res_nodeid = ret_nodeid;
1465                 lkb->lkb_nodeid = ret_nodeid;
1466         }
1467         return 0;
1468 }
1469
1470 static void process_lookup_list(struct dlm_rsb *r)
1471 {
1472         struct dlm_lkb *lkb, *safe;
1473
1474         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1475                 list_del(&lkb->lkb_rsb_lookup);
1476                 _request_lock(r, lkb);
1477                 schedule();
1478         }
1479 }
1480
1481 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1482
1483 static void confirm_master(struct dlm_rsb *r, int error)
1484 {
1485         struct dlm_lkb *lkb;
1486
1487         if (!r->res_first_lkid)
1488                 return;
1489
1490         switch (error) {
1491         case 0:
1492         case -EINPROGRESS:
1493                 r->res_first_lkid = 0;
1494                 process_lookup_list(r);
1495                 break;
1496
1497         case -EAGAIN:
1498                 /* the remote master didn't queue our NOQUEUE request;
1499                    make a waiting lkb the first_lkid */
1500
1501                 r->res_first_lkid = 0;
1502
1503                 if (!list_empty(&r->res_lookup)) {
1504                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1505                                          lkb_rsb_lookup);
1506                         list_del(&lkb->lkb_rsb_lookup);
1507                         r->res_first_lkid = lkb->lkb_id;
1508                         _request_lock(r, lkb);
1509                 } else
1510                         r->res_nodeid = -1;
1511                 break;
1512
1513         default:
1514                 log_error(r->res_ls, "confirm_master unknown error %d", error);
1515         }
1516 }
1517
1518 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1519                          int namelen, uint32_t parent_lkid, void *ast,
1520                          void *astarg, void *bast, struct dlm_args *args)
1521 {
1522         int rv = -EINVAL;
1523
1524         /* check for invalid arg usage */
1525
1526         if (mode < 0 || mode > DLM_LOCK_EX)
1527                 goto out;
1528
1529         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1530                 goto out;
1531
1532         if (flags & DLM_LKF_CANCEL)
1533                 goto out;
1534
1535         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1536                 goto out;
1537
1538         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1539                 goto out;
1540
1541         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1542                 goto out;
1543
1544         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1545                 goto out;
1546
1547         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1548                 goto out;
1549
1550         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1551                 goto out;
1552
1553         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1554                 goto out;
1555
1556         if (!ast || !lksb)
1557                 goto out;
1558
1559         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1560                 goto out;
1561
1562         /* parent/child locks not yet supported */
1563         if (parent_lkid)
1564                 goto out;
1565
1566         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1567                 goto out;
1568
1569         /* these args will be copied to the lkb in validate_lock_args,
1570            it cannot be done now because when converting locks, fields in
1571            an active lkb cannot be modified before locking the rsb */
1572
1573         args->flags = flags;
1574         args->astaddr = ast;
1575         args->astparam = (long) astarg;
1576         args->bastaddr = bast;
1577         args->mode = mode;
1578         args->lksb = lksb;
1579         rv = 0;
1580  out:
1581         return rv;
1582 }
1583
1584 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1585 {
1586         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1587                       DLM_LKF_FORCEUNLOCK))
1588                 return -EINVAL;
1589
1590         args->flags = flags;
1591         args->astparam = (long) astarg;
1592         return 0;
1593 }
1594
1595 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1596                               struct dlm_args *args)
1597 {
1598         int rv = -EINVAL;
1599
1600         if (args->flags & DLM_LKF_CONVERT) {
1601                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1602                         goto out;
1603
1604                 if (args->flags & DLM_LKF_QUECVT &&
1605                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1606                         goto out;
1607
1608                 rv = -EBUSY;
1609                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1610                         goto out;
1611
1612                 if (lkb->lkb_wait_type)
1613                         goto out;
1614         }
1615
1616         lkb->lkb_exflags = args->flags;
1617         lkb->lkb_sbflags = 0;
1618         lkb->lkb_astaddr = args->astaddr;
1619         lkb->lkb_astparam = args->astparam;
1620         lkb->lkb_bastaddr = args->bastaddr;
1621         lkb->lkb_rqmode = args->mode;
1622         lkb->lkb_lksb = args->lksb;
1623         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1624         lkb->lkb_ownpid = (int) current->pid;
1625         rv = 0;
1626  out:
1627         return rv;
1628 }
1629
1630 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1631 {
1632         int rv = -EINVAL;
1633
1634         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1635                 goto out;
1636
1637         if (args->flags & DLM_LKF_FORCEUNLOCK)
1638                 goto out_ok;
1639
1640         if (args->flags & DLM_LKF_CANCEL &&
1641             lkb->lkb_status == DLM_LKSTS_GRANTED)
1642                 goto out;
1643
1644         if (!(args->flags & DLM_LKF_CANCEL) &&
1645             lkb->lkb_status != DLM_LKSTS_GRANTED)
1646                 goto out;
1647
1648         rv = -EBUSY;
1649         if (lkb->lkb_wait_type)
1650                 goto out;
1651
1652  out_ok:
1653         lkb->lkb_exflags = args->flags;
1654         lkb->lkb_sbflags = 0;
1655         lkb->lkb_astparam = args->astparam;
1656
1657         rv = 0;
1658  out:
1659         return rv;
1660 }
1661
1662 /*
1663  * Four stage 4 varieties:
1664  * do_request(), do_convert(), do_unlock(), do_cancel()
1665  * These are called on the master node for the given lock and
1666  * from the central locking logic.
1667  */
1668
1669 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1670 {
1671         int error = 0;
1672
1673         if (can_be_granted(r, lkb, 1)) {
1674                 grant_lock(r, lkb);
1675                 queue_cast(r, lkb, 0);
1676                 goto out;
1677         }
1678
1679         if (can_be_queued(lkb)) {
1680                 error = -EINPROGRESS;
1681                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1682                 send_blocking_asts(r, lkb);
1683                 goto out;
1684         }
1685
1686         error = -EAGAIN;
1687         if (force_blocking_asts(lkb))
1688                 send_blocking_asts_all(r, lkb);
1689         queue_cast(r, lkb, -EAGAIN);
1690
1691  out:
1692         return error;
1693 }
1694
1695 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1696 {
1697         int error = 0;
1698
1699         /* changing an existing lock may allow others to be granted */
1700
1701         if (can_be_granted(r, lkb, 1)) {
1702                 grant_lock(r, lkb);
1703                 queue_cast(r, lkb, 0);
1704                 grant_pending_locks(r);
1705                 goto out;
1706         }
1707
1708         if (can_be_queued(lkb)) {
1709                 if (is_demoted(lkb))
1710                         grant_pending_locks(r);
1711                 error = -EINPROGRESS;
1712                 del_lkb(r, lkb);
1713                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
1714                 send_blocking_asts(r, lkb);
1715                 goto out;
1716         }
1717
1718         error = -EAGAIN;
1719         if (force_blocking_asts(lkb))
1720                 send_blocking_asts_all(r, lkb);
1721         queue_cast(r, lkb, -EAGAIN);
1722
1723  out:
1724         return error;
1725 }
1726
1727 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1728 {
1729         remove_lock(r, lkb);
1730         queue_cast(r, lkb, -DLM_EUNLOCK);
1731         grant_pending_locks(r);
1732         return -DLM_EUNLOCK;
1733 }
1734
1735 /* FIXME: if revert_lock() finds that the lkb is granted, we should
1736    skip the queue_cast(ECANCEL).  It indicates that the request/convert
1737    completed (and queued a normal ast) just before the cancel; we don't
1738    want to clobber the sb_result for the normal ast with ECANCEL. */
1739    
1740 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1741 {
1742         revert_lock(r, lkb);
1743         queue_cast(r, lkb, -DLM_ECANCEL);
1744         grant_pending_locks(r);
1745         return -DLM_ECANCEL;
1746 }
1747
1748 /*
1749  * Four stage 3 varieties:
1750  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
1751  */
1752
1753 /* add a new lkb to a possibly new rsb, called by requesting process */
1754
1755 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1756 {
1757         int error;
1758
1759         /* set_master: sets lkb nodeid from r */
1760
1761         error = set_master(r, lkb);
1762         if (error < 0)
1763                 goto out;
1764         if (error) {
1765                 error = 0;
1766                 goto out;
1767         }
1768
1769         if (is_remote(r))
1770                 /* receive_request() calls do_request() on remote node */
1771                 error = send_request(r, lkb);
1772         else
1773                 error = do_request(r, lkb);
1774  out:
1775         return error;
1776 }
1777
1778 /* change some property of an existing lkb, e.g. mode */
1779
1780 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1781 {
1782         int error;
1783
1784         if (is_remote(r))
1785                 /* receive_convert() calls do_convert() on remote node */
1786                 error = send_convert(r, lkb);
1787         else
1788                 error = do_convert(r, lkb);
1789
1790         return error;
1791 }
1792
1793 /* remove an existing lkb from the granted queue */
1794
1795 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1796 {
1797         int error;
1798
1799         if (is_remote(r))
1800                 /* receive_unlock() calls do_unlock() on remote node */
1801                 error = send_unlock(r, lkb);
1802         else
1803                 error = do_unlock(r, lkb);
1804
1805         return error;
1806 }
1807
1808 /* remove an existing lkb from the convert or wait queue */
1809
1810 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1811 {
1812         int error;
1813
1814         if (is_remote(r))
1815                 /* receive_cancel() calls do_cancel() on remote node */
1816                 error = send_cancel(r, lkb);
1817         else
1818                 error = do_cancel(r, lkb);
1819
1820         return error;
1821 }
1822
1823 /*
1824  * Four stage 2 varieties:
1825  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
1826  */
1827
1828 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
1829                         int len, struct dlm_args *args)
1830 {
1831         struct dlm_rsb *r;
1832         int error;
1833
1834         error = validate_lock_args(ls, lkb, args);
1835         if (error)
1836                 goto out;
1837
1838         error = find_rsb(ls, name, len, R_CREATE, &r);
1839         if (error)
1840                 goto out;
1841
1842         lock_rsb(r);
1843
1844         attach_lkb(r, lkb);
1845         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
1846
1847         error = _request_lock(r, lkb);
1848
1849         unlock_rsb(r);
1850         put_rsb(r);
1851
1852  out:
1853         return error;
1854 }
1855
1856 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1857                         struct dlm_args *args)
1858 {
1859         struct dlm_rsb *r;
1860         int error;
1861
1862         r = lkb->lkb_resource;
1863
1864         hold_rsb(r);
1865         lock_rsb(r);
1866
1867         error = validate_lock_args(ls, lkb, args);
1868         if (error)
1869                 goto out;
1870
1871         error = _convert_lock(r, lkb);
1872  out:
1873         unlock_rsb(r);
1874         put_rsb(r);
1875         return error;
1876 }
1877
1878 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1879                        struct dlm_args *args)
1880 {
1881         struct dlm_rsb *r;
1882         int error;
1883
1884         r = lkb->lkb_resource;
1885
1886         hold_rsb(r);
1887         lock_rsb(r);
1888
1889         error = validate_unlock_args(lkb, args);
1890         if (error)
1891                 goto out;
1892
1893         error = _unlock_lock(r, lkb);
1894  out:
1895         unlock_rsb(r);
1896         put_rsb(r);
1897         return error;
1898 }
1899
1900 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1901                        struct dlm_args *args)
1902 {
1903         struct dlm_rsb *r;
1904         int error;
1905
1906         r = lkb->lkb_resource;
1907
1908         hold_rsb(r);
1909         lock_rsb(r);
1910
1911         error = validate_unlock_args(lkb, args);
1912         if (error)
1913                 goto out;
1914
1915         error = _cancel_lock(r, lkb);
1916  out:
1917         unlock_rsb(r);
1918         put_rsb(r);
1919         return error;
1920 }
1921
1922 /*
1923  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
1924  */
1925
1926 int dlm_lock(dlm_lockspace_t *lockspace,
1927              int mode,
1928              struct dlm_lksb *lksb,
1929              uint32_t flags,
1930              void *name,
1931              unsigned int namelen,
1932              uint32_t parent_lkid,
1933              void (*ast) (void *astarg),
1934              void *astarg,
1935              void (*bast) (void *astarg, int mode))
1936 {
1937         struct dlm_ls *ls;
1938         struct dlm_lkb *lkb;
1939         struct dlm_args args;
1940         int error, convert = flags & DLM_LKF_CONVERT;
1941
1942         ls = dlm_find_lockspace_local(lockspace);
1943         if (!ls)
1944                 return -EINVAL;
1945
1946         lock_recovery(ls);
1947
1948         if (convert)
1949                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
1950         else
1951                 error = create_lkb(ls, &lkb);
1952
1953         if (error)
1954                 goto out;
1955
1956         error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
1957                               astarg, bast, &args);
1958         if (error)
1959                 goto out_put;
1960
1961         if (convert)
1962                 error = convert_lock(ls, lkb, &args);
1963         else
1964                 error = request_lock(ls, lkb, name, namelen, &args);
1965
1966         if (error == -EINPROGRESS)
1967                 error = 0;
1968  out_put:
1969         if (convert || error)
1970                 __put_lkb(ls, lkb);
1971         if (error == -EAGAIN)
1972                 error = 0;
1973  out:
1974         unlock_recovery(ls);
1975         dlm_put_lockspace(ls);
1976         return error;
1977 }
1978
1979 int dlm_unlock(dlm_lockspace_t *lockspace,
1980                uint32_t lkid,
1981                uint32_t flags,
1982                struct dlm_lksb *lksb,
1983                void *astarg)
1984 {
1985         struct dlm_ls *ls;
1986         struct dlm_lkb *lkb;
1987         struct dlm_args args;
1988         int error;
1989
1990         ls = dlm_find_lockspace_local(lockspace);
1991         if (!ls)
1992                 return -EINVAL;
1993
1994         lock_recovery(ls);
1995
1996         error = find_lkb(ls, lkid, &lkb);
1997         if (error)
1998                 goto out;
1999
2000         error = set_unlock_args(flags, astarg, &args);
2001         if (error)
2002                 goto out_put;
2003
2004         if (flags & DLM_LKF_CANCEL)
2005                 error = cancel_lock(ls, lkb, &args);
2006         else
2007                 error = unlock_lock(ls, lkb, &args);
2008
2009         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2010                 error = 0;
2011  out_put:
2012         dlm_put_lkb(lkb);
2013  out:
2014         unlock_recovery(ls);
2015         dlm_put_lockspace(ls);
2016         return error;
2017 }
2018
2019 /*
2020  * send/receive routines for remote operations and replies
2021  *
2022  * send_args
2023  * send_common
2024  * send_request                 receive_request
2025  * send_convert                 receive_convert
2026  * send_unlock                  receive_unlock
2027  * send_cancel                  receive_cancel
2028  * send_grant                   receive_grant
2029  * send_bast                    receive_bast
2030  * send_lookup                  receive_lookup
2031  * send_remove                  receive_remove
2032  *
2033  *                              send_common_reply
2034  * receive_request_reply        send_request_reply
2035  * receive_convert_reply        send_convert_reply
2036  * receive_unlock_reply         send_unlock_reply
2037  * receive_cancel_reply         send_cancel_reply
2038  * receive_lookup_reply         send_lookup_reply
2039  */
2040
2041 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2042                           int to_nodeid, int mstype,
2043                           struct dlm_message **ms_ret,
2044                           struct dlm_mhandle **mh_ret)
2045 {
2046         struct dlm_message *ms;
2047         struct dlm_mhandle *mh;
2048         char *mb;
2049         int mb_len = sizeof(struct dlm_message);
2050
2051         switch (mstype) {
2052         case DLM_MSG_REQUEST:
2053         case DLM_MSG_LOOKUP:
2054         case DLM_MSG_REMOVE:
2055                 mb_len += r->res_length;
2056                 break;
2057         case DLM_MSG_CONVERT:
2058         case DLM_MSG_UNLOCK:
2059         case DLM_MSG_REQUEST_REPLY:
2060         case DLM_MSG_CONVERT_REPLY:
2061         case DLM_MSG_GRANT:
2062                 if (lkb && lkb->lkb_lvbptr)
2063                         mb_len += r->res_ls->ls_lvblen;
2064                 break;
2065         }
2066
2067         /* get_buffer gives us a message handle (mh) that we need to
2068            pass into lowcomms_commit and a message buffer (mb) that we
2069            write our data into */
2070
2071         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2072         if (!mh)
2073                 return -ENOBUFS;
2074
2075         memset(mb, 0, mb_len);
2076
2077         ms = (struct dlm_message *) mb;
2078
2079         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2080         ms->m_header.h_lockspace = r->res_ls->ls_global_id;
2081         ms->m_header.h_nodeid = dlm_our_nodeid();
2082         ms->m_header.h_length = mb_len;
2083         ms->m_header.h_cmd = DLM_MSG;
2084
2085         ms->m_type = mstype;
2086
2087         *mh_ret = mh;
2088         *ms_ret = ms;
2089         return 0;
2090 }
2091
2092 /* further lowcomms enhancements or alternate implementations may make
2093    the return value from this function useful at some point */
2094
2095 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2096 {
2097         dlm_message_out(ms);
2098         dlm_lowcomms_commit_buffer(mh);
2099         return 0;
2100 }
2101
2102 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2103                       struct dlm_message *ms)
2104 {
2105         ms->m_nodeid   = lkb->lkb_nodeid;
2106         ms->m_pid      = lkb->lkb_ownpid;
2107         ms->m_lkid     = lkb->lkb_id;
2108         ms->m_remid    = lkb->lkb_remid;
2109         ms->m_exflags  = lkb->lkb_exflags;
2110         ms->m_sbflags  = lkb->lkb_sbflags;
2111         ms->m_flags    = lkb->lkb_flags;
2112         ms->m_lvbseq   = lkb->lkb_lvbseq;
2113         ms->m_status   = lkb->lkb_status;
2114         ms->m_grmode   = lkb->lkb_grmode;
2115         ms->m_rqmode   = lkb->lkb_rqmode;
2116         ms->m_hash     = r->res_hash;
2117
2118         /* m_result and m_bastmode are set from function args,
2119            not from lkb fields */
2120
2121         if (lkb->lkb_bastaddr)
2122                 ms->m_asts |= AST_BAST;
2123         if (lkb->lkb_astaddr)
2124                 ms->m_asts |= AST_COMP;
2125
2126         if (ms->m_type == DLM_MSG_REQUEST || ms->m_type == DLM_MSG_LOOKUP)
2127                 memcpy(ms->m_extra, r->res_name, r->res_length);
2128
2129         else if (lkb->lkb_lvbptr)
2130                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2131
2132 }
2133
2134 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2135 {
2136         struct dlm_message *ms;
2137         struct dlm_mhandle *mh;
2138         int to_nodeid, error;
2139
2140         add_to_waiters(lkb, mstype);
2141
2142         to_nodeid = r->res_nodeid;
2143
2144         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2145         if (error)
2146                 goto fail;
2147
2148         send_args(r, lkb, ms);
2149
2150         error = send_message(mh, ms);
2151         if (error)
2152                 goto fail;
2153         return 0;
2154
2155  fail:
2156         remove_from_waiters(lkb);
2157         return error;
2158 }
2159
2160 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2161 {
2162         return send_common(r, lkb, DLM_MSG_REQUEST);
2163 }
2164
2165 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2166 {
2167         int error;
2168
2169         error = send_common(r, lkb, DLM_MSG_CONVERT);
2170
2171         /* down conversions go without a reply from the master */
2172         if (!error && down_conversion(lkb)) {
2173                 remove_from_waiters(lkb);
2174                 r->res_ls->ls_stub_ms.m_result = 0;
2175                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2176         }
2177
2178         return error;
2179 }
2180
2181 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2182    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2183    that the master is still correct. */
2184
2185 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2186 {
2187         return send_common(r, lkb, DLM_MSG_UNLOCK);
2188 }
2189
2190 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2191 {
2192         return send_common(r, lkb, DLM_MSG_CANCEL);
2193 }
2194
2195 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2196 {
2197         struct dlm_message *ms;
2198         struct dlm_mhandle *mh;
2199         int to_nodeid, error;
2200
2201         to_nodeid = lkb->lkb_nodeid;
2202
2203         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2204         if (error)
2205                 goto out;
2206
2207         send_args(r, lkb, ms);
2208
2209         ms->m_result = 0;
2210
2211         error = send_message(mh, ms);
2212  out:
2213         return error;
2214 }
2215
2216 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2217 {
2218         struct dlm_message *ms;
2219         struct dlm_mhandle *mh;
2220         int to_nodeid, error;
2221
2222         to_nodeid = lkb->lkb_nodeid;
2223
2224         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2225         if (error)
2226                 goto out;
2227
2228         send_args(r, lkb, ms);
2229
2230         ms->m_bastmode = mode;
2231
2232         error = send_message(mh, ms);
2233  out:
2234         return error;
2235 }
2236
2237 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2238 {
2239         struct dlm_message *ms;
2240         struct dlm_mhandle *mh;
2241         int to_nodeid, error;
2242
2243         add_to_waiters(lkb, DLM_MSG_LOOKUP);
2244
2245         to_nodeid = dlm_dir_nodeid(r);
2246
2247         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2248         if (error)
2249                 goto fail;
2250
2251         send_args(r, lkb, ms);
2252
2253         error = send_message(mh, ms);
2254         if (error)
2255                 goto fail;
2256         return 0;
2257
2258  fail:
2259         remove_from_waiters(lkb);
2260         return error;
2261 }
2262
2263 static int send_remove(struct dlm_rsb *r)
2264 {
2265         struct dlm_message *ms;
2266         struct dlm_mhandle *mh;
2267         int to_nodeid, error;
2268
2269         to_nodeid = dlm_dir_nodeid(r);
2270
2271         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2272         if (error)
2273                 goto out;
2274
2275         memcpy(ms->m_extra, r->res_name, r->res_length);
2276         ms->m_hash = r->res_hash;
2277
2278         error = send_message(mh, ms);
2279  out:
2280         return error;
2281 }
2282
2283 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2284                              int mstype, int rv)
2285 {
2286         struct dlm_message *ms;
2287         struct dlm_mhandle *mh;
2288         int to_nodeid, error;
2289
2290         to_nodeid = lkb->lkb_nodeid;
2291
2292         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2293         if (error)
2294                 goto out;
2295
2296         send_args(r, lkb, ms);
2297
2298         ms->m_result = rv;
2299
2300         error = send_message(mh, ms);
2301  out:
2302         return error;
2303 }
2304
2305 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2306 {
2307         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2308 }
2309
2310 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2311 {
2312         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2313 }
2314
2315 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2316 {
2317         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2318 }
2319
2320 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2321 {
2322         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2323 }
2324
2325 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2326                              int ret_nodeid, int rv)
2327 {
2328         struct dlm_rsb *r = &ls->ls_stub_rsb;
2329         struct dlm_message *ms;
2330         struct dlm_mhandle *mh;
2331         int error, nodeid = ms_in->m_header.h_nodeid;
2332
2333         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2334         if (error)
2335                 goto out;
2336
2337         ms->m_lkid = ms_in->m_lkid;
2338         ms->m_result = rv;
2339         ms->m_nodeid = ret_nodeid;
2340
2341         error = send_message(mh, ms);
2342  out:
2343         return error;
2344 }
2345
2346 /* which args we save from a received message depends heavily on the type
2347    of message, unlike the send side where we can safely send everything about
2348    the lkb for any type of message */
2349
2350 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2351 {
2352         lkb->lkb_exflags = ms->m_exflags;
2353         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2354                          (ms->m_flags & 0x0000FFFF);
2355 }
2356
2357 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2358 {
2359         lkb->lkb_sbflags = ms->m_sbflags;
2360         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2361                          (ms->m_flags & 0x0000FFFF);
2362 }
2363
2364 static int receive_extralen(struct dlm_message *ms)
2365 {
2366         return (ms->m_header.h_length - sizeof(struct dlm_message));
2367 }
2368
2369 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2370                        struct dlm_message *ms)
2371 {
2372         int len;
2373
2374         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2375                 if (!lkb->lkb_lvbptr)
2376                         lkb->lkb_lvbptr = allocate_lvb(ls);
2377                 if (!lkb->lkb_lvbptr)
2378                         return -ENOMEM;
2379                 len = receive_extralen(ms);
2380                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2381         }
2382         return 0;
2383 }
2384
2385 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2386                                 struct dlm_message *ms)
2387 {
2388         lkb->lkb_nodeid = ms->m_header.h_nodeid;
2389         lkb->lkb_ownpid = ms->m_pid;
2390         lkb->lkb_remid = ms->m_lkid;
2391         lkb->lkb_grmode = DLM_LOCK_IV;
2392         lkb->lkb_rqmode = ms->m_rqmode;
2393         lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2394         lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2395
2396         DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2397
2398         if (receive_lvb(ls, lkb, ms))
2399                 return -ENOMEM;
2400
2401         return 0;
2402 }
2403
2404 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2405                                 struct dlm_message *ms)
2406 {
2407         if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2408                 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2409                           lkb->lkb_nodeid, ms->m_header.h_nodeid,
2410                           lkb->lkb_id, lkb->lkb_remid);
2411                 return -EINVAL;
2412         }
2413
2414         if (!is_master_copy(lkb))
2415                 return -EINVAL;
2416
2417         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2418                 return -EBUSY;
2419
2420         if (receive_lvb(ls, lkb, ms))
2421                 return -ENOMEM;
2422
2423         lkb->lkb_rqmode = ms->m_rqmode;
2424         lkb->lkb_lvbseq = ms->m_lvbseq;
2425
2426         return 0;
2427 }
2428
2429 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2430                                struct dlm_message *ms)
2431 {
2432         if (!is_master_copy(lkb))
2433                 return -EINVAL;
2434         if (receive_lvb(ls, lkb, ms))
2435                 return -ENOMEM;
2436         return 0;
2437 }
2438
2439 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2440    uses to send a reply and that the remote end uses to process the reply. */
2441
2442 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2443 {
2444         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2445         lkb->lkb_nodeid = ms->m_header.h_nodeid;
2446         lkb->lkb_remid = ms->m_lkid;
2447 }
2448
2449 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2450 {
2451         struct dlm_lkb *lkb;
2452         struct dlm_rsb *r;
2453         int error, namelen;
2454
2455         error = create_lkb(ls, &lkb);
2456         if (error)
2457                 goto fail;
2458
2459         receive_flags(lkb, ms);
2460         lkb->lkb_flags |= DLM_IFL_MSTCPY;
2461         error = receive_request_args(ls, lkb, ms);
2462         if (error) {
2463                 __put_lkb(ls, lkb);
2464                 goto fail;
2465         }
2466
2467         namelen = receive_extralen(ms);
2468
2469         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2470         if (error) {
2471                 __put_lkb(ls, lkb);
2472                 goto fail;
2473         }
2474
2475         lock_rsb(r);
2476
2477         attach_lkb(r, lkb);
2478         error = do_request(r, lkb);
2479         send_request_reply(r, lkb, error);
2480
2481         unlock_rsb(r);
2482         put_rsb(r);
2483
2484         if (error == -EINPROGRESS)
2485                 error = 0;
2486         if (error)
2487                 dlm_put_lkb(lkb);
2488         return;
2489
2490  fail:
2491         setup_stub_lkb(ls, ms);
2492         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2493 }
2494
2495 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2496 {
2497         struct dlm_lkb *lkb;
2498         struct dlm_rsb *r;
2499         int error, reply = 1;
2500
2501         error = find_lkb(ls, ms->m_remid, &lkb);
2502         if (error)
2503                 goto fail;
2504
2505         r = lkb->lkb_resource;
2506
2507         hold_rsb(r);
2508         lock_rsb(r);
2509
2510         receive_flags(lkb, ms);
2511         error = receive_convert_args(ls, lkb, ms);
2512         if (error)
2513                 goto out;
2514         reply = !down_conversion(lkb);
2515
2516         error = do_convert(r, lkb);
2517  out:
2518         if (reply)
2519                 send_convert_reply(r, lkb, error);
2520
2521         unlock_rsb(r);
2522         put_rsb(r);
2523         dlm_put_lkb(lkb);
2524         return;
2525
2526  fail:
2527         setup_stub_lkb(ls, ms);
2528         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2529 }
2530
2531 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2532 {
2533         struct dlm_lkb *lkb;
2534         struct dlm_rsb *r;
2535         int error;
2536
2537         error = find_lkb(ls, ms->m_remid, &lkb);
2538         if (error)
2539                 goto fail;
2540
2541         r = lkb->lkb_resource;
2542
2543         hold_rsb(r);
2544         lock_rsb(r);
2545
2546         receive_flags(lkb, ms);
2547         error = receive_unlock_args(ls, lkb, ms);
2548         if (error)
2549                 goto out;
2550
2551         error = do_unlock(r, lkb);
2552  out:
2553         send_unlock_reply(r, lkb, error);
2554
2555         unlock_rsb(r);
2556         put_rsb(r);
2557         dlm_put_lkb(lkb);
2558         return;
2559
2560  fail:
2561         setup_stub_lkb(ls, ms);
2562         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2563 }
2564
2565 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2566 {
2567         struct dlm_lkb *lkb;
2568         struct dlm_rsb *r;
2569         int error;
2570
2571         error = find_lkb(ls, ms->m_remid, &lkb);
2572         if (error)
2573                 goto fail;
2574
2575         receive_flags(lkb, ms);
2576
2577         r = lkb->lkb_resource;
2578
2579         hold_rsb(r);
2580         lock_rsb(r);
2581
2582         error = do_cancel(r, lkb);
2583         send_cancel_reply(r, lkb, error);
2584
2585         unlock_rsb(r);
2586         put_rsb(r);
2587         dlm_put_lkb(lkb);
2588         return;
2589
2590  fail:
2591         setup_stub_lkb(ls, ms);
2592         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2593 }
2594
2595 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2596 {
2597         struct dlm_lkb *lkb;
2598         struct dlm_rsb *r;
2599         int error;
2600
2601         error = find_lkb(ls, ms->m_remid, &lkb);
2602         if (error) {
2603                 log_error(ls, "receive_grant no lkb");
2604                 return;
2605         }
2606         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2607
2608         r = lkb->lkb_resource;
2609
2610         hold_rsb(r);
2611         lock_rsb(r);
2612
2613         receive_flags_reply(lkb, ms);
2614         grant_lock_pc(r, lkb, ms);
2615         queue_cast(r, lkb, 0);
2616
2617         unlock_rsb(r);
2618         put_rsb(r);
2619         dlm_put_lkb(lkb);
2620 }
2621
2622 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2623 {
2624         struct dlm_lkb *lkb;
2625         struct dlm_rsb *r;
2626         int error;
2627
2628         error = find_lkb(ls, ms->m_remid, &lkb);
2629         if (error) {
2630                 log_error(ls, "receive_bast no lkb");
2631                 return;
2632         }
2633         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2634
2635         r = lkb->lkb_resource;
2636
2637         hold_rsb(r);
2638         lock_rsb(r);
2639
2640         queue_bast(r, lkb, ms->m_bastmode);
2641
2642         unlock_rsb(r);
2643         put_rsb(r);
2644         dlm_put_lkb(lkb);
2645 }
2646
2647 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
2648 {
2649         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
2650
2651         from_nodeid = ms->m_header.h_nodeid;
2652         our_nodeid = dlm_our_nodeid();
2653
2654         len = receive_extralen(ms);
2655
2656         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2657         if (dir_nodeid != our_nodeid) {
2658                 log_error(ls, "lookup dir_nodeid %d from %d",
2659                           dir_nodeid, from_nodeid);
2660                 error = -EINVAL;
2661                 ret_nodeid = -1;
2662                 goto out;
2663         }
2664
2665         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
2666
2667         /* Optimization: we're master so treat lookup as a request */
2668         if (!error && ret_nodeid == our_nodeid) {
2669                 receive_request(ls, ms);
2670                 return;
2671         }
2672  out:
2673         send_lookup_reply(ls, ms, ret_nodeid, error);
2674 }
2675
2676 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
2677 {
2678         int len, dir_nodeid, from_nodeid;
2679
2680         from_nodeid = ms->m_header.h_nodeid;
2681
2682         len = receive_extralen(ms);
2683
2684         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2685         if (dir_nodeid != dlm_our_nodeid()) {
2686                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
2687                           dir_nodeid, from_nodeid);
2688                 return;
2689         }
2690
2691         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
2692 }
2693
2694 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2695 {
2696         struct dlm_lkb *lkb;
2697         struct dlm_rsb *r;
2698         int error, mstype;
2699
2700         error = find_lkb(ls, ms->m_remid, &lkb);
2701         if (error) {
2702                 log_error(ls, "receive_request_reply no lkb");
2703                 return;
2704         }
2705         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2706
2707         mstype = lkb->lkb_wait_type;
2708         error = remove_from_waiters(lkb);
2709         if (error) {
2710                 log_error(ls, "receive_request_reply not on waiters");
2711                 goto out;
2712         }
2713
2714         /* this is the value returned from do_request() on the master */
2715         error = ms->m_result;
2716
2717         r = lkb->lkb_resource;
2718         hold_rsb(r);
2719         lock_rsb(r);
2720
2721         /* Optimization: the dir node was also the master, so it took our
2722            lookup as a request and sent request reply instead of lookup reply */
2723         if (mstype == DLM_MSG_LOOKUP) {
2724                 r->res_nodeid = ms->m_header.h_nodeid;
2725                 lkb->lkb_nodeid = r->res_nodeid;
2726         }
2727
2728         switch (error) {
2729         case -EAGAIN:
2730                 /* request would block (be queued) on remote master;
2731                    the unhold undoes the original ref from create_lkb()
2732                    so it leads to the lkb being freed */
2733                 queue_cast(r, lkb, -EAGAIN);
2734                 confirm_master(r, -EAGAIN);
2735                 unhold_lkb(lkb);
2736                 break;
2737
2738         case -EINPROGRESS:
2739         case 0:
2740                 /* request was queued or granted on remote master */
2741                 receive_flags_reply(lkb, ms);
2742                 lkb->lkb_remid = ms->m_lkid;
2743                 if (error)
2744                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
2745                 else {
2746                         grant_lock_pc(r, lkb, ms);
2747                         queue_cast(r, lkb, 0);
2748                 }
2749                 confirm_master(r, error);
2750                 break;
2751
2752         case -EBADR:
2753         case -ENOTBLK:
2754                 /* find_rsb failed to find rsb or rsb wasn't master */
2755                 r->res_nodeid = -1;
2756                 lkb->lkb_nodeid = -1;
2757                 _request_lock(r, lkb);
2758                 break;
2759
2760         default:
2761                 log_error(ls, "receive_request_reply error %d", error);
2762         }
2763
2764         unlock_rsb(r);
2765         put_rsb(r);
2766  out:
2767         dlm_put_lkb(lkb);
2768 }
2769
2770 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2771                                     struct dlm_message *ms)
2772 {
2773         int error = ms->m_result;
2774
2775         /* this is the value returned from do_convert() on the master */
2776
2777         switch (error) {
2778         case -EAGAIN:
2779                 /* convert would block (be queued) on remote master */
2780                 queue_cast(r, lkb, -EAGAIN);
2781                 break;
2782
2783         case -EINPROGRESS:
2784                 /* convert was queued on remote master */
2785                 del_lkb(r, lkb);
2786                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2787                 break;
2788
2789         case 0:
2790                 /* convert was granted on remote master */
2791                 receive_flags_reply(lkb, ms);
2792                 grant_lock_pc(r, lkb, ms);
2793                 queue_cast(r, lkb, 0);
2794                 break;
2795
2796         default:
2797                 log_error(r->res_ls, "receive_convert_reply error %d", error);
2798         }
2799 }
2800
2801 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2802 {
2803         struct dlm_rsb *r = lkb->lkb_resource;
2804
2805         hold_rsb(r);
2806         lock_rsb(r);
2807
2808         __receive_convert_reply(r, lkb, ms);
2809
2810         unlock_rsb(r);
2811         put_rsb(r);
2812 }
2813
2814 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
2815 {
2816         struct dlm_lkb *lkb;
2817         int error;
2818
2819         error = find_lkb(ls, ms->m_remid, &lkb);
2820         if (error) {
2821                 log_error(ls, "receive_convert_reply no lkb");
2822                 return;
2823         }
2824         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2825
2826         error = remove_from_waiters(lkb);
2827         if (error) {
2828                 log_error(ls, "receive_convert_reply not on waiters");
2829                 goto out;
2830         }
2831
2832         _receive_convert_reply(lkb, ms);
2833  out:
2834         dlm_put_lkb(lkb);
2835 }
2836
2837 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2838 {
2839         struct dlm_rsb *r = lkb->lkb_resource;
2840         int error = ms->m_result;
2841
2842         hold_rsb(r);
2843         lock_rsb(r);
2844
2845         /* this is the value returned from do_unlock() on the master */
2846
2847         switch (error) {
2848         case -DLM_EUNLOCK:
2849                 receive_flags_reply(lkb, ms);
2850                 remove_lock_pc(r, lkb);
2851                 queue_cast(r, lkb, -DLM_EUNLOCK);
2852                 break;
2853         default:
2854                 log_error(r->res_ls, "receive_unlock_reply error %d", error);
2855         }
2856
2857         unlock_rsb(r);
2858         put_rsb(r);
2859 }
2860
2861 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
2862 {
2863         struct dlm_lkb *lkb;
2864         int error;
2865
2866         error = find_lkb(ls, ms->m_remid, &lkb);
2867         if (error) {
2868                 log_error(ls, "receive_unlock_reply no lkb");
2869                 return;
2870         }
2871         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2872
2873         error = remove_from_waiters(lkb);
2874         if (error) {
2875                 log_error(ls, "receive_unlock_reply not on waiters");
2876                 goto out;
2877         }
2878
2879         _receive_unlock_reply(lkb, ms);
2880  out:
2881         dlm_put_lkb(lkb);
2882 }
2883
2884 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2885 {
2886         struct dlm_rsb *r = lkb->lkb_resource;
2887         int error = ms->m_result;
2888
2889         hold_rsb(r);
2890         lock_rsb(r);
2891
2892         /* this is the value returned from do_cancel() on the master */
2893
2894         switch (error) {
2895         case -DLM_ECANCEL:
2896                 receive_flags_reply(lkb, ms);
2897                 revert_lock_pc(r, lkb);
2898                 queue_cast(r, lkb, -DLM_ECANCEL);
2899                 break;
2900         default:
2901                 log_error(r->res_ls, "receive_cancel_reply error %d", error);
2902         }
2903
2904         unlock_rsb(r);
2905         put_rsb(r);
2906 }
2907
2908 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
2909 {
2910         struct dlm_lkb *lkb;
2911         int error;
2912
2913         error = find_lkb(ls, ms->m_remid, &lkb);
2914         if (error) {
2915                 log_error(ls, "receive_cancel_reply no lkb");
2916                 return;
2917         }
2918         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2919
2920         error = remove_from_waiters(lkb);
2921         if (error) {
2922                 log_error(ls, "receive_cancel_reply not on waiters");
2923                 goto out;
2924         }
2925
2926         _receive_cancel_reply(lkb, ms);
2927  out:
2928         dlm_put_lkb(lkb);
2929 }
2930
2931 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
2932 {
2933         struct dlm_lkb *lkb;
2934         struct dlm_rsb *r;
2935         int error, ret_nodeid;
2936
2937         error = find_lkb(ls, ms->m_lkid, &lkb);
2938         if (error) {
2939                 log_error(ls, "receive_lookup_reply no lkb");
2940                 return;
2941         }
2942
2943         error = remove_from_waiters(lkb);
2944         if (error) {
2945                 log_error(ls, "receive_lookup_reply not on waiters");
2946                 goto out;
2947         }
2948
2949         /* this is the value returned by dlm_dir_lookup on dir node
2950            FIXME: will a non-zero error ever be returned? */
2951         error = ms->m_result;
2952
2953         r = lkb->lkb_resource;
2954         hold_rsb(r);
2955         lock_rsb(r);
2956
2957         ret_nodeid = ms->m_nodeid;
2958         if (ret_nodeid == dlm_our_nodeid()) {
2959                 r->res_nodeid = 0;
2960                 ret_nodeid = 0;
2961                 r->res_first_lkid = 0;
2962         } else {
2963                 /* set_master() will copy res_nodeid to lkb_nodeid */
2964                 r->res_nodeid = ret_nodeid;
2965         }
2966
2967         _request_lock(r, lkb);
2968
2969         if (!ret_nodeid)
2970                 process_lookup_list(r);
2971
2972         unlock_rsb(r);
2973         put_rsb(r);
2974  out:
2975         dlm_put_lkb(lkb);
2976 }
2977
2978 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
2979 {
2980         struct dlm_message *ms = (struct dlm_message *) hd;
2981         struct dlm_ls *ls;
2982         int error;
2983
2984         if (!recovery)
2985                 dlm_message_in(ms);
2986
2987         ls = dlm_find_lockspace_global(hd->h_lockspace);
2988         if (!ls) {
2989                 log_print("drop message %d from %d for unknown lockspace %d",
2990                           ms->m_type, nodeid, hd->h_lockspace);
2991                 return -EINVAL;
2992         }
2993
2994         /* recovery may have just ended leaving a bunch of backed-up requests
2995            in the requestqueue; wait while dlm_recoverd clears them */
2996
2997         if (!recovery)
2998                 dlm_wait_requestqueue(ls);
2999
3000         /* recovery may have just started while there were a bunch of
3001            in-flight requests -- save them in requestqueue to be processed
3002            after recovery.  we can't let dlm_recvd block on the recovery
3003            lock.  if dlm_recoverd is calling this function to clear the
3004            requestqueue, it needs to be interrupted (-EINTR) if another
3005            recovery operation is starting. */
3006
3007         while (1) {
3008                 if (dlm_locking_stopped(ls)) {
3009                         if (!recovery)
3010                                 dlm_add_requestqueue(ls, nodeid, hd);
3011                         error = -EINTR;
3012                         goto out;
3013                 }
3014
3015                 if (lock_recovery_try(ls))
3016                         break;
3017                 schedule();
3018         }
3019
3020         switch (ms->m_type) {
3021
3022         /* messages sent to a master node */
3023
3024         case DLM_MSG_REQUEST:
3025                 receive_request(ls, ms);
3026                 break;
3027
3028         case DLM_MSG_CONVERT:
3029                 receive_convert(ls, ms);
3030                 break;
3031
3032         case DLM_MSG_UNLOCK:
3033                 receive_unlock(ls, ms);
3034                 break;
3035
3036         case DLM_MSG_CANCEL:
3037                 receive_cancel(ls, ms);
3038                 break;
3039
3040         /* messages sent from a master node (replies to above) */
3041
3042         case DLM_MSG_REQUEST_REPLY:
3043                 receive_request_reply(ls, ms);
3044                 break;
3045
3046         case DLM_MSG_CONVERT_REPLY:
3047                 receive_convert_reply(ls, ms);
3048                 break;
3049
3050         case DLM_MSG_UNLOCK_REPLY:
3051                 receive_unlock_reply(ls, ms);
3052                 break;
3053
3054         case DLM_MSG_CANCEL_REPLY:
3055                 receive_cancel_reply(ls, ms);
3056                 break;
3057
3058         /* messages sent from a master node (only two types of async msg) */
3059
3060         case DLM_MSG_GRANT:
3061                 receive_grant(ls, ms);
3062                 break;
3063
3064         case DLM_MSG_BAST:
3065                 receive_bast(ls, ms);
3066                 break;
3067
3068         /* messages sent to a dir node */
3069
3070         case DLM_MSG_LOOKUP:
3071                 receive_lookup(ls, ms);
3072                 break;
3073
3074         case DLM_MSG_REMOVE:
3075                 receive_remove(ls, ms);
3076                 break;
3077
3078         /* messages sent from a dir node (remove has no reply) */
3079
3080         case DLM_MSG_LOOKUP_REPLY:
3081                 receive_lookup_reply(ls, ms);
3082                 break;
3083
3084         default:
3085                 log_error(ls, "unknown message type %d", ms->m_type);
3086         }
3087
3088         unlock_recovery(ls);
3089  out:
3090         dlm_put_lockspace(ls);
3091         dlm_astd_wake();
3092         return 0;
3093 }
3094
3095
3096 /*
3097  * Recovery related
3098  */
3099
3100 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3101 {
3102         if (middle_conversion(lkb)) {
3103                 hold_lkb(lkb);
3104                 ls->ls_stub_ms.m_result = -EINPROGRESS;
3105                 _remove_from_waiters(lkb);
3106                 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3107
3108                 /* Same special case as in receive_rcom_lock_args() */
3109                 lkb->lkb_grmode = DLM_LOCK_IV;
3110                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3111                 unhold_lkb(lkb);
3112
3113         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3114                 lkb->lkb_flags |= DLM_IFL_RESEND;
3115         }
3116
3117         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3118            conversions are async; there's no reply from the remote master */
3119 }
3120
3121 /* A waiting lkb needs recovery if the master node has failed, or
3122    the master node is changing (only when no directory is used) */
3123
3124 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3125 {
3126         if (dlm_is_removed(ls, lkb->lkb_nodeid))
3127                 return 1;
3128
3129         if (!dlm_no_directory(ls))
3130                 return 0;
3131
3132         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3133                 return 1;
3134
3135         return 0;
3136 }
3137
3138 /* Recovery for locks that are waiting for replies from nodes that are now
3139    gone.  We can just complete unlocks and cancels by faking a reply from the
3140    dead node.  Requests and up-conversions we flag to be resent after
3141    recovery.  Down-conversions can just be completed with a fake reply like
3142    unlocks.  Conversions between PR and CW need special attention. */
3143
3144 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3145 {
3146         struct dlm_lkb *lkb, *safe;
3147
3148         mutex_lock(&ls->ls_waiters_mutex);
3149
3150         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3151                 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3152                           lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3153
3154                 /* all outstanding lookups, regardless of destination  will be
3155                    resent after recovery is done */
3156
3157                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3158                         lkb->lkb_flags |= DLM_IFL_RESEND;
3159                         continue;
3160                 }
3161
3162                 if (!waiter_needs_recovery(ls, lkb))
3163                         continue;
3164
3165                 switch (lkb->lkb_wait_type) {
3166
3167                 case DLM_MSG_REQUEST:
3168                         lkb->lkb_flags |= DLM_IFL_RESEND;
3169                         break;
3170
3171                 case DLM_MSG_CONVERT:
3172                         recover_convert_waiter(ls, lkb);
3173                         break;
3174
3175                 case DLM_MSG_UNLOCK:
3176                         hold_lkb(lkb);
3177                         ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3178                         _remove_from_waiters(lkb);
3179                         _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3180                         dlm_put_lkb(lkb);
3181                         break;
3182
3183                 case DLM_MSG_CANCEL:
3184                         hold_lkb(lkb);
3185                         ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3186                         _remove_from_waiters(lkb);
3187                         _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3188                         dlm_put_lkb(lkb);
3189                         break;
3190
3191                 default:
3192                         log_error(ls, "invalid lkb wait_type %d",
3193                                   lkb->lkb_wait_type);
3194                 }
3195         }
3196         mutex_unlock(&ls->ls_waiters_mutex);
3197 }
3198
3199 static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
3200 {
3201         struct dlm_lkb *lkb;
3202         int rv = 0;
3203
3204         mutex_lock(&ls->ls_waiters_mutex);
3205         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3206                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3207                         rv = lkb->lkb_wait_type;
3208                         _remove_from_waiters(lkb);
3209                         lkb->lkb_flags &= ~DLM_IFL_RESEND;
3210                         break;
3211                 }
3212         }
3213         mutex_unlock(&ls->ls_waiters_mutex);
3214
3215         if (!rv)
3216                 lkb = NULL;
3217         *lkb_ret = lkb;
3218         return rv;
3219 }
3220
3221 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
3222    master or dir-node for r.  Processing the lkb may result in it being placed
3223    back on waiters. */
3224
3225 int dlm_recover_waiters_post(struct dlm_ls *ls)
3226 {
3227         struct dlm_lkb *lkb;
3228         struct dlm_rsb *r;
3229         int error = 0, mstype;
3230
3231         while (1) {
3232                 if (dlm_locking_stopped(ls)) {
3233                         log_debug(ls, "recover_waiters_post aborted");
3234                         error = -EINTR;
3235                         break;
3236                 }
3237
3238                 mstype = remove_resend_waiter(ls, &lkb);
3239                 if (!mstype)
3240                         break;
3241
3242                 r = lkb->lkb_resource;
3243
3244                 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3245                           lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3246
3247                 switch (mstype) {
3248
3249                 case DLM_MSG_LOOKUP:
3250                         hold_rsb(r);
3251                         lock_rsb(r);
3252                         _request_lock(r, lkb);
3253                         if (is_master(r))
3254                                 confirm_master(r, 0);
3255                         unlock_rsb(r);
3256                         put_rsb(r);
3257                         break;
3258
3259                 case DLM_MSG_REQUEST:
3260                         hold_rsb(r);
3261                         lock_rsb(r);
3262                         _request_lock(r, lkb);
3263                         unlock_rsb(r);
3264                         put_rsb(r);
3265                         break;
3266
3267                 case DLM_MSG_CONVERT:
3268                         hold_rsb(r);
3269                         lock_rsb(r);
3270                         _convert_lock(r, lkb);
3271                         unlock_rsb(r);
3272                         put_rsb(r);
3273                         break;
3274
3275                 default:
3276                         log_error(ls, "recover_waiters_post type %d", mstype);
3277                 }
3278         }
3279
3280         return error;
3281 }
3282
3283 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3284                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3285 {
3286         struct dlm_ls *ls = r->res_ls;
3287         struct dlm_lkb *lkb, *safe;
3288
3289         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3290                 if (test(ls, lkb)) {
3291                         rsb_set_flag(r, RSB_LOCKS_PURGED);
3292                         del_lkb(r, lkb);
3293                         /* this put should free the lkb */
3294                         if (!dlm_put_lkb(lkb))
3295                                 log_error(ls, "purged lkb not released");
3296                 }
3297         }
3298 }
3299
3300 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3301 {
3302         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3303 }
3304
3305 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3306 {
3307         return is_master_copy(lkb);
3308 }
3309
3310 static void purge_dead_locks(struct dlm_rsb *r)
3311 {
3312         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3313         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3314         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3315 }
3316
3317 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3318 {
3319         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3320         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3321         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3322 }
3323
3324 /* Get rid of locks held by nodes that are gone. */
3325
3326 int dlm_purge_locks(struct dlm_ls *ls)
3327 {
3328         struct dlm_rsb *r;
3329
3330         log_debug(ls, "dlm_purge_locks");
3331
3332         down_write(&ls->ls_root_sem);
3333         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3334                 hold_rsb(r);
3335                 lock_rsb(r);
3336                 if (is_master(r))
3337                         purge_dead_locks(r);
3338                 unlock_rsb(r);
3339                 unhold_rsb(r);
3340
3341                 schedule();
3342         }
3343         up_write(&ls->ls_root_sem);
3344
3345         return 0;
3346 }
3347
3348 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3349 {
3350         struct dlm_rsb *r, *r_ret = NULL;
3351
3352         read_lock(&ls->ls_rsbtbl[bucket].lock);
3353         list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3354                 if (!rsb_flag(r, RSB_LOCKS_PURGED))
3355                         continue;
3356                 hold_rsb(r);
3357                 rsb_clear_flag(r, RSB_LOCKS_PURGED);
3358                 r_ret = r;
3359                 break;
3360         }
3361         read_unlock(&ls->ls_rsbtbl[bucket].lock);
3362         return r_ret;
3363 }
3364
3365 void dlm_grant_after_purge(struct dlm_ls *ls)
3366 {
3367         struct dlm_rsb *r;
3368         int i;
3369
3370         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
3371                 r = find_purged_rsb(ls, i);
3372                 if (!r)
3373                         continue;
3374                 lock_rsb(r);
3375                 if (is_master(r)) {
3376                         grant_pending_locks(r);
3377                         confirm_master(r, 0);
3378                 }
3379                 unlock_rsb(r);
3380                 put_rsb(r);
3381         }
3382 }
3383
3384 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3385                                          uint32_t remid)
3386 {
3387         struct dlm_lkb *lkb;
3388
3389         list_for_each_entry(lkb, head, lkb_statequeue) {
3390                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3391                         return lkb;
3392         }
3393         return NULL;
3394 }
3395
3396 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3397                                     uint32_t remid)
3398 {
3399         struct dlm_lkb *lkb;
3400
3401         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3402         if (lkb)
3403                 return lkb;
3404         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3405         if (lkb)
3406                 return lkb;
3407         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3408         if (lkb)
3409                 return lkb;
3410         return NULL;
3411 }
3412
3413 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3414                                   struct dlm_rsb *r, struct dlm_rcom *rc)
3415 {
3416         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3417         int lvblen;
3418
3419         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3420         lkb->lkb_ownpid = rl->rl_ownpid;
3421         lkb->lkb_remid = rl->rl_lkid;
3422         lkb->lkb_exflags = rl->rl_exflags;
3423         lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3424         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3425         lkb->lkb_lvbseq = rl->rl_lvbseq;
3426         lkb->lkb_rqmode = rl->rl_rqmode;
3427         lkb->lkb_grmode = rl->rl_grmode;
3428         /* don't set lkb_status because add_lkb wants to itself */
3429
3430         lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3431         lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3432
3433         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3434                 lkb->lkb_lvbptr = allocate_lvb(ls);
3435                 if (!lkb->lkb_lvbptr)
3436                         return -ENOMEM;
3437                 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3438                          sizeof(struct rcom_lock);
3439                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3440         }
3441
3442         /* Conversions between PR and CW (middle modes) need special handling.
3443            The real granted mode of these converting locks cannot be determined
3444            until all locks have been rebuilt on the rsb (recover_conversion) */
3445
3446         if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3447                 rl->rl_status = DLM_LKSTS_CONVERT;
3448                 lkb->lkb_grmode = DLM_LOCK_IV;
3449                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3450         }
3451
3452         return 0;
3453 }
3454
3455 /* This lkb may have been recovered in a previous aborted recovery so we need
3456    to check if the rsb already has an lkb with the given remote nodeid/lkid.
3457    If so we just send back a standard reply.  If not, we create a new lkb with
3458    the given values and send back our lkid.  We send back our lkid by sending
3459    back the rcom_lock struct we got but with the remid field filled in. */
3460
3461 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3462 {
3463         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3464         struct dlm_rsb *r;
3465         struct dlm_lkb *lkb;
3466         int error;
3467
3468         if (rl->rl_parent_lkid) {
3469                 error = -EOPNOTSUPP;
3470                 goto out;
3471         }
3472
3473         error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3474         if (error)
3475                 goto out;
3476
3477         lock_rsb(r);
3478
3479         lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3480         if (lkb) {
3481                 error = -EEXIST;
3482                 goto out_remid;
3483         }
3484
3485         error = create_lkb(ls, &lkb);
3486         if (error)
3487                 goto out_unlock;
3488
3489         error = receive_rcom_lock_args(ls, lkb, r, rc);
3490         if (error) {
3491                 __put_lkb(ls, lkb);
3492                 goto out_unlock;
3493         }
3494
3495         attach_lkb(r, lkb);
3496         add_lkb(r, lkb, rl->rl_status);
3497         error = 0;
3498
3499  out_remid:
3500         /* this is the new value returned to the lock holder for
3501            saving in its process-copy lkb */
3502         rl->rl_remid = lkb->lkb_id;
3503
3504  out_unlock:
3505         unlock_rsb(r);
3506         put_rsb(r);
3507  out:
3508         if (error)
3509                 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3510         rl->rl_result = error;
3511         return error;
3512 }
3513
3514 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3515 {
3516         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3517         struct dlm_rsb *r;
3518         struct dlm_lkb *lkb;
3519         int error;
3520
3521         error = find_lkb(ls, rl->rl_lkid, &lkb);
3522         if (error) {
3523                 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3524                 return error;
3525         }
3526
3527         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3528
3529         error = rl->rl_result;
3530
3531         r = lkb->lkb_resource;
3532         hold_rsb(r);
3533         lock_rsb(r);
3534
3535         switch (error) {
3536         case -EEXIST:
3537                 log_debug(ls, "master copy exists %x", lkb->lkb_id);
3538                 /* fall through */
3539         case 0:
3540                 lkb->lkb_remid = rl->rl_remid;
3541                 break;
3542         default:
3543                 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
3544                           error, lkb->lkb_id);
3545         }
3546
3547         /* an ack for dlm_recover_locks() which waits for replies from
3548            all the locks it sends to new masters */
3549         dlm_recovered_lock(r);
3550
3551         unlock_rsb(r);
3552         put_rsb(r);
3553         dlm_put_lkb(lkb);
3554
3555         return 0;
3556 }
3557
3558 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
3559                      int mode, uint32_t flags, void *name, unsigned int namelen,
3560                      uint32_t parent_lkid)
3561 {
3562         struct dlm_lkb *lkb;
3563         struct dlm_args args;
3564         int error;
3565
3566         lock_recovery(ls);
3567
3568         error = create_lkb(ls, &lkb);
3569         if (error) {
3570                 kfree(ua);
3571                 goto out;
3572         }
3573
3574         if (flags & DLM_LKF_VALBLK) {
3575                 ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3576                 if (!ua->lksb.sb_lvbptr) {
3577                         kfree(ua);
3578                         __put_lkb(ls, lkb);
3579                         error = -ENOMEM;
3580                         goto out;
3581                 }
3582         }
3583
3584         /* After ua is attached to lkb it will be freed by free_lkb().
3585            When DLM_IFL_USER is set, the dlm knows that this is a userspace
3586            lock and that lkb_astparam is the dlm_user_args structure. */
3587
3588         error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
3589                               FAKE_USER_AST, ua, FAKE_USER_AST, &args);
3590         lkb->lkb_flags |= DLM_IFL_USER;
3591         ua->old_mode = DLM_LOCK_IV;
3592
3593         if (error) {
3594                 __put_lkb(ls, lkb);
3595                 goto out;
3596         }
3597
3598         error = request_lock(ls, lkb, name, namelen, &args);
3599
3600         switch (error) {
3601         case 0:
3602                 break;
3603         case -EINPROGRESS:
3604                 error = 0;
3605                 break;
3606         case -EAGAIN:
3607                 error = 0;
3608                 /* fall through */
3609         default:
3610                 __put_lkb(ls, lkb);
3611                 goto out;
3612         }
3613
3614         /* add this new lkb to the per-process list of locks */
3615         spin_lock(&ua->proc->locks_spin);
3616         kref_get(&lkb->lkb_ref);
3617         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
3618         spin_unlock(&ua->proc->locks_spin);
3619  out:
3620         unlock_recovery(ls);
3621         return error;
3622 }
3623
3624 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3625                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
3626 {
3627         struct dlm_lkb *lkb;
3628         struct dlm_args args;
3629         struct dlm_user_args *ua;
3630         int error;
3631
3632         lock_recovery(ls);
3633
3634         error = find_lkb(ls, lkid, &lkb);
3635         if (error)
3636                 goto out;
3637
3638         /* user can change the params on its lock when it converts it, or
3639            add an lvb that didn't exist before */
3640
3641         ua = (struct dlm_user_args *)lkb->lkb_astparam;
3642
3643         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
3644                 ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3645                 if (!ua->lksb.sb_lvbptr) {
3646                         error = -ENOMEM;
3647                         goto out_put;
3648                 }
3649         }
3650         if (lvb_in && ua->lksb.sb_lvbptr)
3651                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3652
3653         ua->castparam = ua_tmp->castparam;
3654         ua->castaddr = ua_tmp->castaddr;
3655         ua->bastparam = ua_tmp->bastparam;
3656         ua->bastaddr = ua_tmp->bastaddr;
3657         ua->old_mode = lkb->lkb_grmode;
3658
3659         error = set_lock_args(mode, &ua->lksb, flags, 0, 0, FAKE_USER_AST, ua,
3660                               FAKE_USER_AST, &args);
3661         if (error)
3662                 goto out_put;
3663
3664         error = convert_lock(ls, lkb, &args);
3665
3666         if (error == -EINPROGRESS || error == -EAGAIN)
3667                 error = 0;
3668  out_put:
3669         dlm_put_lkb(lkb);
3670  out:
3671         unlock_recovery(ls);
3672         kfree(ua_tmp);
3673         return error;
3674 }
3675
3676 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3677                     uint32_t flags, uint32_t lkid, char *lvb_in)
3678 {
3679         struct dlm_lkb *lkb;
3680         struct dlm_args args;
3681         struct dlm_user_args *ua;
3682         int error;
3683
3684         lock_recovery(ls);
3685
3686         error = find_lkb(ls, lkid, &lkb);
3687         if (error)
3688                 goto out;
3689
3690         ua = (struct dlm_user_args *)lkb->lkb_astparam;
3691
3692         if (lvb_in && ua->lksb.sb_lvbptr)
3693                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3694         ua->castparam = ua_tmp->castparam;
3695
3696         error = set_unlock_args(flags, ua, &args);
3697         if (error)
3698                 goto out_put;
3699
3700         error = unlock_lock(ls, lkb, &args);
3701
3702         if (error == -DLM_EUNLOCK)
3703                 error = 0;
3704         if (error)
3705                 goto out_put;
3706
3707         spin_lock(&ua->proc->locks_spin);
3708         list_del(&lkb->lkb_ownqueue);
3709         spin_unlock(&ua->proc->locks_spin);
3710
3711         /* this removes the reference for the proc->locks list added by
3712            dlm_user_request */
3713         unhold_lkb(lkb);
3714  out_put:
3715         dlm_put_lkb(lkb);
3716  out:
3717         unlock_recovery(ls);
3718         return error;
3719 }
3720
3721 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3722                     uint32_t flags, uint32_t lkid)
3723 {
3724         struct dlm_lkb *lkb;
3725         struct dlm_args args;
3726         struct dlm_user_args *ua;
3727         int error;
3728
3729         lock_recovery(ls);
3730
3731         error = find_lkb(ls, lkid, &lkb);
3732         if (error)
3733                 goto out;
3734
3735         ua = (struct dlm_user_args *)lkb->lkb_astparam;
3736         ua->castparam = ua_tmp->castparam;
3737
3738         error = set_unlock_args(flags, ua, &args);
3739         if (error)
3740                 goto out_put;
3741
3742         error = cancel_lock(ls, lkb, &args);
3743
3744         if (error == -DLM_ECANCEL)
3745                 error = 0;
3746         if (error)
3747                 goto out_put;
3748
3749         /* this lkb was removed from the WAITING queue */
3750         if (lkb->lkb_grmode == DLM_LOCK_IV) {
3751                 spin_lock(&ua->proc->locks_spin);
3752                 list_del(&lkb->lkb_ownqueue);
3753                 spin_unlock(&ua->proc->locks_spin);
3754                 unhold_lkb(lkb);
3755         }
3756  out_put:
3757         dlm_put_lkb(lkb);
3758  out:
3759         unlock_recovery(ls);
3760         return error;
3761 }
3762
3763 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3764 {
3765         struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3766
3767         if (ua->lksb.sb_lvbptr)
3768                 kfree(ua->lksb.sb_lvbptr);
3769         kfree(ua);
3770         lkb->lkb_astparam = (long)NULL;
3771
3772         /* TODO: propogate to master if needed */
3773         return 0;
3774 }
3775
3776 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
3777    Regardless of what rsb queue the lock is on, it's removed and freed. */
3778
3779 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3780 {
3781         struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3782         struct dlm_args args;
3783         int error;
3784
3785         /* FIXME: we need to handle the case where the lkb is in limbo
3786            while the rsb is being looked up, currently we assert in
3787            _unlock_lock/is_remote because rsb nodeid is -1. */
3788
3789         set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
3790
3791         error = unlock_lock(ls, lkb, &args);
3792         if (error == -DLM_EUNLOCK)
3793                 error = 0;
3794         return error;
3795 }
3796
3797 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
3798    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
3799    which we clear here. */
3800
3801 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
3802    list, and no more device_writes should add lkb's to proc->locks list; so we
3803    shouldn't need to take asts_spin or locks_spin here.  this assumes that
3804    device reads/writes/closes are serialized -- FIXME: we may need to serialize
3805    them ourself. */
3806
3807 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
3808 {
3809         struct dlm_lkb *lkb, *safe;
3810
3811         lock_recovery(ls);
3812         mutex_lock(&ls->ls_clear_proc_locks);
3813
3814         list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) {
3815                 if (lkb->lkb_ast_type) {
3816                         list_del(&lkb->lkb_astqueue);
3817                         unhold_lkb(lkb);
3818                 }
3819
3820                 list_del(&lkb->lkb_ownqueue);
3821
3822                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) {
3823                         lkb->lkb_flags |= DLM_IFL_ORPHAN;
3824                         orphan_proc_lock(ls, lkb);
3825                 } else {
3826                         lkb->lkb_flags |= DLM_IFL_DEAD;
3827                         unlock_proc_lock(ls, lkb);
3828                 }
3829
3830                 /* this removes the reference for the proc->locks list
3831                    added by dlm_user_request, it may result in the lkb
3832                    being freed */
3833
3834                 dlm_put_lkb(lkb);
3835         }
3836         mutex_unlock(&ls->ls_clear_proc_locks);
3837         unlock_recovery(ls);
3838 }