]> err.no Git - linux-2.6/blob - fs/ocfs2/dlm/dlmmaster.c
ocfs2: update lvb immediately during recovery
[linux-2.6] / fs / ocfs2 / dlm / dlmmaster.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmmod.c
5  *
6  * standalone DLM module
7  *
8  * Copyright (C) 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  *
25  */
26
27
28 #include <linux/module.h>
29 #include <linux/fs.h>
30 #include <linux/types.h>
31 #include <linux/slab.h>
32 #include <linux/highmem.h>
33 #include <linux/utsname.h>
34 #include <linux/init.h>
35 #include <linux/sysctl.h>
36 #include <linux/random.h>
37 #include <linux/blkdev.h>
38 #include <linux/socket.h>
39 #include <linux/inet.h>
40 #include <linux/spinlock.h>
41 #include <linux/delay.h>
42
43
44 #include "cluster/heartbeat.h"
45 #include "cluster/nodemanager.h"
46 #include "cluster/tcp.h"
47
48 #include "dlmapi.h"
49 #include "dlmcommon.h"
50 #include "dlmdebug.h"
51 #include "dlmdomain.h"
52
53 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
54 #include "cluster/masklog.h"
55
56 enum dlm_mle_type {
57         DLM_MLE_BLOCK,
58         DLM_MLE_MASTER,
59         DLM_MLE_MIGRATION
60 };
61
62 struct dlm_lock_name
63 {
64         u8 len;
65         u8 name[DLM_LOCKID_NAME_MAX];
66 };
67
68 struct dlm_master_list_entry
69 {
70         struct list_head list;
71         struct list_head hb_events;
72         struct dlm_ctxt *dlm;
73         spinlock_t spinlock;
74         wait_queue_head_t wq;
75         atomic_t woken;
76         struct kref mle_refs;
77         int inuse;
78         unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
79         unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
80         unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
81         unsigned long node_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
82         u8 master;
83         u8 new_master;
84         enum dlm_mle_type type;
85         struct o2hb_callback_func mle_hb_up;
86         struct o2hb_callback_func mle_hb_down;
87         union {
88                 struct dlm_lock_resource *res;
89                 struct dlm_lock_name name;
90         } u;
91 };
92
93 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
94                               struct dlm_master_list_entry *mle,
95                               struct o2nm_node *node,
96                               int idx);
97 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
98                             struct dlm_master_list_entry *mle,
99                             struct o2nm_node *node,
100                             int idx);
101
102 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
103 static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
104                                 unsigned int namelen, void *nodemap,
105                                 u32 flags);
106
107 static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
108                                 struct dlm_master_list_entry *mle,
109                                 const char *name,
110                                 unsigned int namelen)
111 {
112         struct dlm_lock_resource *res;
113
114         if (dlm != mle->dlm)
115                 return 0;
116
117         if (mle->type == DLM_MLE_BLOCK ||
118             mle->type == DLM_MLE_MIGRATION) {
119                 if (namelen != mle->u.name.len ||
120                     memcmp(name, mle->u.name.name, namelen)!=0)
121                         return 0;
122         } else {
123                 res = mle->u.res;
124                 if (namelen != res->lockname.len ||
125                     memcmp(res->lockname.name, name, namelen) != 0)
126                         return 0;
127         }
128         return 1;
129 }
130
131 #if 0
132 /* Code here is included but defined out as it aids debugging */
133
134 #define dlm_print_nodemap(m)  _dlm_print_nodemap(m,#m)
135 void _dlm_print_nodemap(unsigned long *map, const char *mapname)
136 {
137         int i;
138         printk("%s=[ ", mapname);
139         for (i=0; i<O2NM_MAX_NODES; i++)
140                 if (test_bit(i, map))
141                         printk("%d ", i);
142         printk("]");
143 }
144
145 void dlm_print_one_mle(struct dlm_master_list_entry *mle)
146 {
147         int refs;
148         char *type;
149         char attached;
150         u8 master;
151         unsigned int namelen;
152         const char *name;
153         struct kref *k;
154         unsigned long *maybe = mle->maybe_map,
155                       *vote = mle->vote_map,
156                       *resp = mle->response_map,
157                       *node = mle->node_map;
158
159         k = &mle->mle_refs;
160         if (mle->type == DLM_MLE_BLOCK)
161                 type = "BLK";
162         else if (mle->type == DLM_MLE_MASTER)
163                 type = "MAS";
164         else
165                 type = "MIG";
166         refs = atomic_read(&k->refcount);
167         master = mle->master;
168         attached = (list_empty(&mle->hb_events) ? 'N' : 'Y');
169
170         if (mle->type != DLM_MLE_MASTER) {
171                 namelen = mle->u.name.len;
172                 name = mle->u.name.name;
173         } else {
174                 namelen = mle->u.res->lockname.len;
175                 name = mle->u.res->lockname.name;
176         }
177
178         mlog(ML_NOTICE, "%.*s: %3s refs=%3d mas=%3u new=%3u evt=%c inuse=%d ",
179                   namelen, name, type, refs, master, mle->new_master, attached,
180                   mle->inuse);
181         dlm_print_nodemap(maybe);
182         printk(", ");
183         dlm_print_nodemap(vote);
184         printk(", ");
185         dlm_print_nodemap(resp);
186         printk(", ");
187         dlm_print_nodemap(node);
188         printk(", ");
189         printk("\n");
190 }
191
192 static void dlm_dump_mles(struct dlm_ctxt *dlm)
193 {
194         struct dlm_master_list_entry *mle;
195         struct list_head *iter;
196         
197         mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name);
198         spin_lock(&dlm->master_lock);
199         list_for_each(iter, &dlm->master_list) {
200                 mle = list_entry(iter, struct dlm_master_list_entry, list);
201                 dlm_print_one_mle(mle);
202         }
203         spin_unlock(&dlm->master_lock);
204 }
205
206 int dlm_dump_all_mles(const char __user *data, unsigned int len)
207 {
208         struct list_head *iter;
209         struct dlm_ctxt *dlm;
210
211         spin_lock(&dlm_domain_lock);
212         list_for_each(iter, &dlm_domains) {
213                 dlm = list_entry (iter, struct dlm_ctxt, list);
214                 mlog(ML_NOTICE, "found dlm: %p, name=%s\n", dlm, dlm->name);
215                 dlm_dump_mles(dlm);
216         }
217         spin_unlock(&dlm_domain_lock);
218         return len;
219 }
220 EXPORT_SYMBOL_GPL(dlm_dump_all_mles);
221
222 #endif  /*  0  */
223
224
225 static kmem_cache_t *dlm_mle_cache = NULL;
226
227
228 static void dlm_mle_release(struct kref *kref);
229 static void dlm_init_mle(struct dlm_master_list_entry *mle,
230                         enum dlm_mle_type type,
231                         struct dlm_ctxt *dlm,
232                         struct dlm_lock_resource *res,
233                         const char *name,
234                         unsigned int namelen);
235 static void dlm_put_mle(struct dlm_master_list_entry *mle);
236 static void __dlm_put_mle(struct dlm_master_list_entry *mle);
237 static int dlm_find_mle(struct dlm_ctxt *dlm,
238                         struct dlm_master_list_entry **mle,
239                         char *name, unsigned int namelen);
240
241 static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to);
242
243
244 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
245                                      struct dlm_lock_resource *res,
246                                      struct dlm_master_list_entry *mle,
247                                      int *blocked);
248 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
249                                     struct dlm_lock_resource *res,
250                                     struct dlm_master_list_entry *mle,
251                                     int blocked);
252 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
253                                  struct dlm_lock_resource *res,
254                                  struct dlm_master_list_entry *mle,
255                                  struct dlm_master_list_entry **oldmle,
256                                  const char *name, unsigned int namelen,
257                                  u8 new_master, u8 master);
258
259 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
260                                     struct dlm_lock_resource *res);
261 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
262                                       struct dlm_lock_resource *res);
263 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
264                                        struct dlm_lock_resource *res,
265                                        u8 target);
266 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
267                                        struct dlm_lock_resource *res);
268
269
270 int dlm_is_host_down(int errno)
271 {
272         switch (errno) {
273                 case -EBADF:
274                 case -ECONNREFUSED:
275                 case -ENOTCONN:
276                 case -ECONNRESET:
277                 case -EPIPE:
278                 case -EHOSTDOWN:
279                 case -EHOSTUNREACH:
280                 case -ETIMEDOUT:
281                 case -ECONNABORTED:
282                 case -ENETDOWN:
283                 case -ENETUNREACH:
284                 case -ENETRESET:
285                 case -ESHUTDOWN:
286                 case -ENOPROTOOPT:
287                 case -EINVAL:   /* if returned from our tcp code,
288                                    this means there is no socket */
289                         return 1;
290         }
291         return 0;
292 }
293
294
295 /*
296  * MASTER LIST FUNCTIONS
297  */
298
299
300 /*
301  * regarding master list entries and heartbeat callbacks:
302  *
303  * in order to avoid sleeping and allocation that occurs in
304  * heartbeat, master list entries are simply attached to the
305  * dlm's established heartbeat callbacks.  the mle is attached
306  * when it is created, and since the dlm->spinlock is held at
307  * that time, any heartbeat event will be properly discovered
308  * by the mle.  the mle needs to be detached from the
309  * dlm->mle_hb_events list as soon as heartbeat events are no
310  * longer useful to the mle, and before the mle is freed.
311  *
312  * as a general rule, heartbeat events are no longer needed by
313  * the mle once an "answer" regarding the lock master has been
314  * received.
315  */
316 static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
317                                               struct dlm_master_list_entry *mle)
318 {
319         assert_spin_locked(&dlm->spinlock);
320
321         list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
322 }
323
324
325 static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
326                                               struct dlm_master_list_entry *mle)
327 {
328         if (!list_empty(&mle->hb_events))
329                 list_del_init(&mle->hb_events);
330 }
331
332
333 static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
334                                             struct dlm_master_list_entry *mle)
335 {
336         spin_lock(&dlm->spinlock);
337         __dlm_mle_detach_hb_events(dlm, mle);
338         spin_unlock(&dlm->spinlock);
339 }
340
341 static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
342 {
343         struct dlm_ctxt *dlm;
344         dlm = mle->dlm;
345
346         assert_spin_locked(&dlm->spinlock);
347         assert_spin_locked(&dlm->master_lock);
348         mle->inuse++;
349         kref_get(&mle->mle_refs);
350 }
351
352 static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
353 {
354         struct dlm_ctxt *dlm;
355         dlm = mle->dlm;
356
357         spin_lock(&dlm->spinlock);
358         spin_lock(&dlm->master_lock);
359         mle->inuse--;
360         __dlm_put_mle(mle);
361         spin_unlock(&dlm->master_lock);
362         spin_unlock(&dlm->spinlock);
363
364 }
365
366 /* remove from list and free */
367 static void __dlm_put_mle(struct dlm_master_list_entry *mle)
368 {
369         struct dlm_ctxt *dlm;
370         dlm = mle->dlm;
371
372         assert_spin_locked(&dlm->spinlock);
373         assert_spin_locked(&dlm->master_lock);
374         if (!atomic_read(&mle->mle_refs.refcount)) {
375                 /* this may or may not crash, but who cares.
376                  * it's a BUG. */
377                 mlog(ML_ERROR, "bad mle: %p\n", mle);
378                 dlm_print_one_mle(mle);
379                 BUG();
380         } else
381                 kref_put(&mle->mle_refs, dlm_mle_release);
382 }
383
384
385 /* must not have any spinlocks coming in */
386 static void dlm_put_mle(struct dlm_master_list_entry *mle)
387 {
388         struct dlm_ctxt *dlm;
389         dlm = mle->dlm;
390
391         spin_lock(&dlm->spinlock);
392         spin_lock(&dlm->master_lock);
393         __dlm_put_mle(mle);
394         spin_unlock(&dlm->master_lock);
395         spin_unlock(&dlm->spinlock);
396 }
397
398 static inline void dlm_get_mle(struct dlm_master_list_entry *mle)
399 {
400         kref_get(&mle->mle_refs);
401 }
402
403 static void dlm_init_mle(struct dlm_master_list_entry *mle,
404                         enum dlm_mle_type type,
405                         struct dlm_ctxt *dlm,
406                         struct dlm_lock_resource *res,
407                         const char *name,
408                         unsigned int namelen)
409 {
410         assert_spin_locked(&dlm->spinlock);
411
412         mle->dlm = dlm;
413         mle->type = type;
414         INIT_LIST_HEAD(&mle->list);
415         INIT_LIST_HEAD(&mle->hb_events);
416         memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
417         spin_lock_init(&mle->spinlock);
418         init_waitqueue_head(&mle->wq);
419         atomic_set(&mle->woken, 0);
420         kref_init(&mle->mle_refs);
421         memset(mle->response_map, 0, sizeof(mle->response_map));
422         mle->master = O2NM_MAX_NODES;
423         mle->new_master = O2NM_MAX_NODES;
424         mle->inuse = 0;
425
426         if (mle->type == DLM_MLE_MASTER) {
427                 BUG_ON(!res);
428                 mle->u.res = res;
429         } else if (mle->type == DLM_MLE_BLOCK) {
430                 BUG_ON(!name);
431                 memcpy(mle->u.name.name, name, namelen);
432                 mle->u.name.len = namelen;
433         } else /* DLM_MLE_MIGRATION */ {
434                 BUG_ON(!name);
435                 memcpy(mle->u.name.name, name, namelen);
436                 mle->u.name.len = namelen;
437         }
438
439         /* copy off the node_map and register hb callbacks on our copy */
440         memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
441         memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
442         clear_bit(dlm->node_num, mle->vote_map);
443         clear_bit(dlm->node_num, mle->node_map);
444
445         /* attach the mle to the domain node up/down events */
446         __dlm_mle_attach_hb_events(dlm, mle);
447 }
448
449
450 /* returns 1 if found, 0 if not */
451 static int dlm_find_mle(struct dlm_ctxt *dlm,
452                         struct dlm_master_list_entry **mle,
453                         char *name, unsigned int namelen)
454 {
455         struct dlm_master_list_entry *tmpmle;
456         struct list_head *iter;
457
458         assert_spin_locked(&dlm->master_lock);
459
460         list_for_each(iter, &dlm->master_list) {
461                 tmpmle = list_entry(iter, struct dlm_master_list_entry, list);
462                 if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
463                         continue;
464                 dlm_get_mle(tmpmle);
465                 *mle = tmpmle;
466                 return 1;
467         }
468         return 0;
469 }
470
471 void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
472 {
473         struct dlm_master_list_entry *mle;
474         struct list_head *iter;
475
476         assert_spin_locked(&dlm->spinlock);
477         
478         list_for_each(iter, &dlm->mle_hb_events) {
479                 mle = list_entry(iter, struct dlm_master_list_entry, 
480                                  hb_events);
481                 if (node_up)
482                         dlm_mle_node_up(dlm, mle, NULL, idx);
483                 else
484                         dlm_mle_node_down(dlm, mle, NULL, idx);
485         }
486 }
487
488 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
489                               struct dlm_master_list_entry *mle,
490                               struct o2nm_node *node, int idx)
491 {
492         spin_lock(&mle->spinlock);
493
494         if (!test_bit(idx, mle->node_map))
495                 mlog(0, "node %u already removed from nodemap!\n", idx);
496         else
497                 clear_bit(idx, mle->node_map);
498
499         spin_unlock(&mle->spinlock);
500 }
501
502 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
503                             struct dlm_master_list_entry *mle,
504                             struct o2nm_node *node, int idx)
505 {
506         spin_lock(&mle->spinlock);
507
508         if (test_bit(idx, mle->node_map))
509                 mlog(0, "node %u already in node map!\n", idx);
510         else
511                 set_bit(idx, mle->node_map);
512
513         spin_unlock(&mle->spinlock);
514 }
515
516
517 int dlm_init_mle_cache(void)
518 {
519         dlm_mle_cache = kmem_cache_create("dlm_mle_cache",
520                                           sizeof(struct dlm_master_list_entry),
521                                           0, SLAB_HWCACHE_ALIGN,
522                                           NULL, NULL);
523         if (dlm_mle_cache == NULL)
524                 return -ENOMEM;
525         return 0;
526 }
527
528 void dlm_destroy_mle_cache(void)
529 {
530         if (dlm_mle_cache)
531                 kmem_cache_destroy(dlm_mle_cache);
532 }
533
534 static void dlm_mle_release(struct kref *kref)
535 {
536         struct dlm_master_list_entry *mle;
537         struct dlm_ctxt *dlm;
538
539         mlog_entry_void();
540
541         mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
542         dlm = mle->dlm;
543
544         if (mle->type != DLM_MLE_MASTER) {
545                 mlog(0, "calling mle_release for %.*s, type %d\n",
546                      mle->u.name.len, mle->u.name.name, mle->type);
547         } else {
548                 mlog(0, "calling mle_release for %.*s, type %d\n",
549                      mle->u.res->lockname.len,
550                      mle->u.res->lockname.name, mle->type);
551         }
552         assert_spin_locked(&dlm->spinlock);
553         assert_spin_locked(&dlm->master_lock);
554
555         /* remove from list if not already */
556         if (!list_empty(&mle->list))
557                 list_del_init(&mle->list);
558
559         /* detach the mle from the domain node up/down events */
560         __dlm_mle_detach_hb_events(dlm, mle);
561
562         /* NOTE: kfree under spinlock here.
563          * if this is bad, we can move this to a freelist. */
564         kmem_cache_free(dlm_mle_cache, mle);
565 }
566
567
568 /*
569  * LOCK RESOURCE FUNCTIONS
570  */
571
572 static void dlm_set_lockres_owner(struct dlm_ctxt *dlm,
573                                   struct dlm_lock_resource *res,
574                                   u8 owner)
575 {
576         assert_spin_locked(&res->spinlock);
577
578         mlog_entry("%.*s, %u\n", res->lockname.len, res->lockname.name, owner);
579
580         if (owner == dlm->node_num)
581                 atomic_inc(&dlm->local_resources);
582         else if (owner == DLM_LOCK_RES_OWNER_UNKNOWN)
583                 atomic_inc(&dlm->unknown_resources);
584         else
585                 atomic_inc(&dlm->remote_resources);
586
587         res->owner = owner;
588 }
589
590 void dlm_change_lockres_owner(struct dlm_ctxt *dlm,
591                               struct dlm_lock_resource *res, u8 owner)
592 {
593         assert_spin_locked(&res->spinlock);
594
595         if (owner == res->owner)
596                 return;
597
598         if (res->owner == dlm->node_num)
599                 atomic_dec(&dlm->local_resources);
600         else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN)
601                 atomic_dec(&dlm->unknown_resources);
602         else
603                 atomic_dec(&dlm->remote_resources);
604
605         dlm_set_lockres_owner(dlm, res, owner);
606 }
607
608
609 static void dlm_lockres_release(struct kref *kref)
610 {
611         struct dlm_lock_resource *res;
612
613         res = container_of(kref, struct dlm_lock_resource, refs);
614
615         /* This should not happen -- all lockres' have a name
616          * associated with them at init time. */
617         BUG_ON(!res->lockname.name);
618
619         mlog(0, "destroying lockres %.*s\n", res->lockname.len,
620              res->lockname.name);
621
622         if (!hlist_unhashed(&res->hash_node) ||
623             !list_empty(&res->granted) ||
624             !list_empty(&res->converting) ||
625             !list_empty(&res->blocked) ||
626             !list_empty(&res->dirty) ||
627             !list_empty(&res->recovering) ||
628             !list_empty(&res->purge)) {
629                 mlog(ML_ERROR,
630                      "Going to BUG for resource %.*s."
631                      "  We're on a list! [%c%c%c%c%c%c%c]\n",
632                      res->lockname.len, res->lockname.name,
633                      !hlist_unhashed(&res->hash_node) ? 'H' : ' ',
634                      !list_empty(&res->granted) ? 'G' : ' ',
635                      !list_empty(&res->converting) ? 'C' : ' ',
636                      !list_empty(&res->blocked) ? 'B' : ' ',
637                      !list_empty(&res->dirty) ? 'D' : ' ',
638                      !list_empty(&res->recovering) ? 'R' : ' ',
639                      !list_empty(&res->purge) ? 'P' : ' ');
640
641                 dlm_print_one_lock_resource(res);
642         }
643
644         /* By the time we're ready to blow this guy away, we shouldn't
645          * be on any lists. */
646         BUG_ON(!hlist_unhashed(&res->hash_node));
647         BUG_ON(!list_empty(&res->granted));
648         BUG_ON(!list_empty(&res->converting));
649         BUG_ON(!list_empty(&res->blocked));
650         BUG_ON(!list_empty(&res->dirty));
651         BUG_ON(!list_empty(&res->recovering));
652         BUG_ON(!list_empty(&res->purge));
653
654         kfree(res->lockname.name);
655
656         kfree(res);
657 }
658
659 void dlm_lockres_put(struct dlm_lock_resource *res)
660 {
661         kref_put(&res->refs, dlm_lockres_release);
662 }
663
664 static void dlm_init_lockres(struct dlm_ctxt *dlm,
665                              struct dlm_lock_resource *res,
666                              const char *name, unsigned int namelen)
667 {
668         char *qname;
669
670         /* If we memset here, we lose our reference to the kmalloc'd
671          * res->lockname.name, so be sure to init every field
672          * correctly! */
673
674         qname = (char *) res->lockname.name;
675         memcpy(qname, name, namelen);
676
677         res->lockname.len = namelen;
678         res->lockname.hash = dlm_lockid_hash(name, namelen);
679
680         init_waitqueue_head(&res->wq);
681         spin_lock_init(&res->spinlock);
682         INIT_HLIST_NODE(&res->hash_node);
683         INIT_LIST_HEAD(&res->granted);
684         INIT_LIST_HEAD(&res->converting);
685         INIT_LIST_HEAD(&res->blocked);
686         INIT_LIST_HEAD(&res->dirty);
687         INIT_LIST_HEAD(&res->recovering);
688         INIT_LIST_HEAD(&res->purge);
689         atomic_set(&res->asts_reserved, 0);
690         res->migration_pending = 0;
691
692         kref_init(&res->refs);
693
694         /* just for consistency */
695         spin_lock(&res->spinlock);
696         dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
697         spin_unlock(&res->spinlock);
698
699         res->state = DLM_LOCK_RES_IN_PROGRESS;
700
701         res->last_used = 0;
702
703         memset(res->lvb, 0, DLM_LVB_LEN);
704 }
705
706 struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
707                                    const char *name,
708                                    unsigned int namelen)
709 {
710         struct dlm_lock_resource *res;
711
712         res = kmalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
713         if (!res)
714                 return NULL;
715
716         res->lockname.name = kmalloc(namelen, GFP_KERNEL);
717         if (!res->lockname.name) {
718                 kfree(res);
719                 return NULL;
720         }
721
722         dlm_init_lockres(dlm, res, name, namelen);
723         return res;
724 }
725
726 /*
727  * lookup a lock resource by name.
728  * may already exist in the hashtable.
729  * lockid is null terminated
730  *
731  * if not, allocate enough for the lockres and for
732  * the temporary structure used in doing the mastering.
733  *
734  * also, do a lookup in the dlm->master_list to see
735  * if another node has begun mastering the same lock.
736  * if so, there should be a block entry in there
737  * for this name, and we should *not* attempt to master
738  * the lock here.   need to wait around for that node
739  * to assert_master (or die).
740  *
741  */
742 struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
743                                           const char *lockid,
744                                           int flags)
745 {
746         struct dlm_lock_resource *tmpres=NULL, *res=NULL;
747         struct dlm_master_list_entry *mle = NULL;
748         struct dlm_master_list_entry *alloc_mle = NULL;
749         int blocked = 0;
750         int ret, nodenum;
751         struct dlm_node_iter iter;
752         unsigned int namelen, hash;
753         int tries = 0;
754         int bit, wait_on_recovery = 0;
755
756         BUG_ON(!lockid);
757
758         namelen = strlen(lockid);
759         hash = dlm_lockid_hash(lockid, namelen);
760
761         mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
762
763 lookup:
764         spin_lock(&dlm->spinlock);
765         tmpres = __dlm_lookup_lockres(dlm, lockid, namelen, hash);
766         if (tmpres) {
767                 spin_unlock(&dlm->spinlock);
768                 mlog(0, "found in hash!\n");
769                 if (res)
770                         dlm_lockres_put(res);
771                 res = tmpres;
772                 goto leave;
773         }
774
775         if (!res) {
776                 spin_unlock(&dlm->spinlock);
777                 mlog(0, "allocating a new resource\n");
778                 /* nothing found and we need to allocate one. */
779                 alloc_mle = (struct dlm_master_list_entry *)
780                         kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL);
781                 if (!alloc_mle)
782                         goto leave;
783                 res = dlm_new_lockres(dlm, lockid, namelen);
784                 if (!res)
785                         goto leave;
786                 goto lookup;
787         }
788
789         mlog(0, "no lockres found, allocated our own: %p\n", res);
790
791         if (flags & LKM_LOCAL) {
792                 /* caller knows it's safe to assume it's not mastered elsewhere
793                  * DONE!  return right away */
794                 spin_lock(&res->spinlock);
795                 dlm_change_lockres_owner(dlm, res, dlm->node_num);
796                 __dlm_insert_lockres(dlm, res);
797                 spin_unlock(&res->spinlock);
798                 spin_unlock(&dlm->spinlock);
799                 /* lockres still marked IN_PROGRESS */
800                 goto wake_waiters;
801         }
802
803         /* check master list to see if another node has started mastering it */
804         spin_lock(&dlm->master_lock);
805
806         /* if we found a block, wait for lock to be mastered by another node */
807         blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
808         if (blocked) {
809                 if (mle->type == DLM_MLE_MASTER) {
810                         mlog(ML_ERROR, "master entry for nonexistent lock!\n");
811                         BUG();
812                 } else if (mle->type == DLM_MLE_MIGRATION) {
813                         /* migration is in progress! */
814                         /* the good news is that we now know the
815                          * "current" master (mle->master). */
816
817                         spin_unlock(&dlm->master_lock);
818                         assert_spin_locked(&dlm->spinlock);
819
820                         /* set the lockres owner and hash it */
821                         spin_lock(&res->spinlock);
822                         dlm_set_lockres_owner(dlm, res, mle->master);
823                         __dlm_insert_lockres(dlm, res);
824                         spin_unlock(&res->spinlock);
825                         spin_unlock(&dlm->spinlock);
826
827                         /* master is known, detach */
828                         dlm_mle_detach_hb_events(dlm, mle);
829                         dlm_put_mle(mle);
830                         mle = NULL;
831                         goto wake_waiters;
832                 }
833         } else {
834                 /* go ahead and try to master lock on this node */
835                 mle = alloc_mle;
836                 /* make sure this does not get freed below */
837                 alloc_mle = NULL;
838                 dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
839                 set_bit(dlm->node_num, mle->maybe_map);
840                 list_add(&mle->list, &dlm->master_list);
841
842                 /* still holding the dlm spinlock, check the recovery map
843                  * to see if there are any nodes that still need to be 
844                  * considered.  these will not appear in the mle nodemap
845                  * but they might own this lockres.  wait on them. */
846                 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
847                 if (bit < O2NM_MAX_NODES) {
848                         mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
849                              "recover before lock mastery can begin\n",
850                              dlm->name, namelen, (char *)lockid, bit);
851                         wait_on_recovery = 1;
852                 }
853         }
854
855         /* at this point there is either a DLM_MLE_BLOCK or a
856          * DLM_MLE_MASTER on the master list, so it's safe to add the
857          * lockres to the hashtable.  anyone who finds the lock will
858          * still have to wait on the IN_PROGRESS. */
859
860         /* finally add the lockres to its hash bucket */
861         __dlm_insert_lockres(dlm, res);
862         /* get an extra ref on the mle in case this is a BLOCK
863          * if so, the creator of the BLOCK may try to put the last
864          * ref at this time in the assert master handler, so we
865          * need an extra one to keep from a bad ptr deref. */
866         dlm_get_mle_inuse(mle);
867         spin_unlock(&dlm->master_lock);
868         spin_unlock(&dlm->spinlock);
869
870         while (wait_on_recovery) {
871                 /* any cluster changes that occurred after dropping the
872                  * dlm spinlock would be detectable be a change on the mle,
873                  * so we only need to clear out the recovery map once. */
874                 if (dlm_is_recovery_lock(lockid, namelen)) {
875                         mlog(ML_NOTICE, "%s: recovery map is not empty, but "
876                              "must master $RECOVERY lock now\n", dlm->name);
877                         if (!dlm_pre_master_reco_lockres(dlm, res))
878                                 wait_on_recovery = 0;
879                         else {
880                                 mlog(0, "%s: waiting 500ms for heartbeat state "
881                                     "change\n", dlm->name);
882                                 msleep(500);
883                         }
884                         continue;
885                 } 
886
887                 dlm_kick_recovery_thread(dlm);
888                 msleep(100);
889                 dlm_wait_for_recovery(dlm);
890
891                 spin_lock(&dlm->spinlock);
892                 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
893                 if (bit < O2NM_MAX_NODES) {
894                         mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
895                              "recover before lock mastery can begin\n",
896                              dlm->name, namelen, (char *)lockid, bit);
897                         wait_on_recovery = 1;
898                 } else
899                         wait_on_recovery = 0;
900                 spin_unlock(&dlm->spinlock);
901         }
902
903         /* must wait for lock to be mastered elsewhere */
904         if (blocked)
905                 goto wait;
906
907 redo_request:
908         ret = -EINVAL;
909         dlm_node_iter_init(mle->vote_map, &iter);
910         while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
911                 ret = dlm_do_master_request(mle, nodenum);
912                 if (ret < 0)
913                         mlog_errno(ret);
914                 if (mle->master != O2NM_MAX_NODES) {
915                         /* found a master ! */
916                         if (mle->master <= nodenum)
917                                 break;
918                         /* if our master request has not reached the master
919                          * yet, keep going until it does.  this is how the
920                          * master will know that asserts are needed back to
921                          * the lower nodes. */
922                         mlog(0, "%s:%.*s: requests only up to %u but master "
923                              "is %u, keep going\n", dlm->name, namelen,
924                              lockid, nodenum, mle->master);
925                 }
926         }
927
928 wait:
929         /* keep going until the response map includes all nodes */
930         ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
931         if (ret < 0) {
932                 mlog(0, "%s:%.*s: node map changed, redo the "
933                      "master request now, blocked=%d\n",
934                      dlm->name, res->lockname.len,
935                      res->lockname.name, blocked);
936                 if (++tries > 20) {
937                         mlog(ML_ERROR, "%s:%.*s: spinning on "
938                              "dlm_wait_for_lock_mastery, blocked=%d\n", 
939                              dlm->name, res->lockname.len, 
940                              res->lockname.name, blocked);
941                         dlm_print_one_lock_resource(res);
942                         /* dlm_print_one_mle(mle); */
943                         tries = 0;
944                 }
945                 goto redo_request;
946         }
947
948         mlog(0, "lockres mastered by %u\n", res->owner);
949         /* make sure we never continue without this */
950         BUG_ON(res->owner == O2NM_MAX_NODES);
951
952         /* master is known, detach if not already detached */
953         dlm_mle_detach_hb_events(dlm, mle);
954         dlm_put_mle(mle);
955         /* put the extra ref */
956         dlm_put_mle_inuse(mle);
957
958 wake_waiters:
959         spin_lock(&res->spinlock);
960         res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
961         spin_unlock(&res->spinlock);
962         wake_up(&res->wq);
963
964 leave:
965         /* need to free the unused mle */
966         if (alloc_mle)
967                 kmem_cache_free(dlm_mle_cache, alloc_mle);
968
969         return res;
970 }
971
972
973 #define DLM_MASTERY_TIMEOUT_MS   5000
974
975 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
976                                      struct dlm_lock_resource *res,
977                                      struct dlm_master_list_entry *mle,
978                                      int *blocked)
979 {
980         u8 m;
981         int ret, bit;
982         int map_changed, voting_done;
983         int assert, sleep;
984
985 recheck:
986         ret = 0;
987         assert = 0;
988
989         /* check if another node has already become the owner */
990         spin_lock(&res->spinlock);
991         if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
992                 mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
993                      res->lockname.len, res->lockname.name, res->owner);
994                 spin_unlock(&res->spinlock);
995                 /* this will cause the master to re-assert across
996                  * the whole cluster, freeing up mles */
997                 if (res->owner != dlm->node_num) {
998                         ret = dlm_do_master_request(mle, res->owner);
999                         if (ret < 0) {
1000                                 /* give recovery a chance to run */
1001                                 mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
1002                                 msleep(500);
1003                                 goto recheck;
1004                         }
1005                 }
1006                 ret = 0;
1007                 goto leave;
1008         }
1009         spin_unlock(&res->spinlock);
1010
1011         spin_lock(&mle->spinlock);
1012         m = mle->master;
1013         map_changed = (memcmp(mle->vote_map, mle->node_map,
1014                               sizeof(mle->vote_map)) != 0);
1015         voting_done = (memcmp(mle->vote_map, mle->response_map,
1016                              sizeof(mle->vote_map)) == 0);
1017
1018         /* restart if we hit any errors */
1019         if (map_changed) {
1020                 int b;
1021                 mlog(0, "%s: %.*s: node map changed, restarting\n",
1022                      dlm->name, res->lockname.len, res->lockname.name);
1023                 ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
1024                 b = (mle->type == DLM_MLE_BLOCK);
1025                 if ((*blocked && !b) || (!*blocked && b)) {
1026                         mlog(0, "%s:%.*s: status change: old=%d new=%d\n", 
1027                              dlm->name, res->lockname.len, res->lockname.name,
1028                              *blocked, b);
1029                         *blocked = b;
1030                 }
1031                 spin_unlock(&mle->spinlock);
1032                 if (ret < 0) {
1033                         mlog_errno(ret);
1034                         goto leave;
1035                 }
1036                 mlog(0, "%s:%.*s: restart lock mastery succeeded, "
1037                      "rechecking now\n", dlm->name, res->lockname.len,
1038                      res->lockname.name);
1039                 goto recheck;
1040         } else {
1041                 if (!voting_done) {
1042                         mlog(0, "map not changed and voting not done "
1043                              "for %s:%.*s\n", dlm->name, res->lockname.len,
1044                              res->lockname.name);
1045                 }
1046         }
1047
1048         if (m != O2NM_MAX_NODES) {
1049                 /* another node has done an assert!
1050                  * all done! */
1051                 sleep = 0;
1052         } else {
1053                 sleep = 1;
1054                 /* have all nodes responded? */
1055                 if (voting_done && !*blocked) {
1056                         bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
1057                         if (dlm->node_num <= bit) {
1058                                 /* my node number is lowest.
1059                                  * now tell other nodes that I am
1060                                  * mastering this. */
1061                                 mle->master = dlm->node_num;
1062                                 assert = 1;
1063                                 sleep = 0;
1064                         }
1065                         /* if voting is done, but we have not received
1066                          * an assert master yet, we must sleep */
1067                 }
1068         }
1069
1070         spin_unlock(&mle->spinlock);
1071
1072         /* sleep if we haven't finished voting yet */
1073         if (sleep) {
1074                 unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
1075
1076                 /*
1077                 if (atomic_read(&mle->mle_refs.refcount) < 2)
1078                         mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle,
1079                         atomic_read(&mle->mle_refs.refcount),
1080                         res->lockname.len, res->lockname.name);
1081                 */
1082                 atomic_set(&mle->woken, 0);
1083                 (void)wait_event_timeout(mle->wq,
1084                                          (atomic_read(&mle->woken) == 1),
1085                                          timeo);
1086                 if (res->owner == O2NM_MAX_NODES) {
1087                         mlog(0, "waiting again\n");
1088                         goto recheck;
1089                 }
1090                 mlog(0, "done waiting, master is %u\n", res->owner);
1091                 ret = 0;
1092                 goto leave;
1093         }
1094
1095         ret = 0;   /* done */
1096         if (assert) {
1097                 m = dlm->node_num;
1098                 mlog(0, "about to master %.*s here, this=%u\n",
1099                      res->lockname.len, res->lockname.name, m);
1100                 ret = dlm_do_assert_master(dlm, res->lockname.name,
1101                                            res->lockname.len, mle->vote_map, 0);
1102                 if (ret) {
1103                         /* This is a failure in the network path,
1104                          * not in the response to the assert_master
1105                          * (any nonzero response is a BUG on this node).
1106                          * Most likely a socket just got disconnected
1107                          * due to node death. */
1108                         mlog_errno(ret);
1109                 }
1110                 /* no longer need to restart lock mastery.
1111                  * all living nodes have been contacted. */
1112                 ret = 0;
1113         }
1114
1115         /* set the lockres owner */
1116         spin_lock(&res->spinlock);
1117         dlm_change_lockres_owner(dlm, res, m);
1118         spin_unlock(&res->spinlock);
1119
1120 leave:
1121         return ret;
1122 }
1123
1124 struct dlm_bitmap_diff_iter
1125 {
1126         int curnode;
1127         unsigned long *orig_bm;
1128         unsigned long *cur_bm;
1129         unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
1130 };
1131
1132 enum dlm_node_state_change
1133 {
1134         NODE_DOWN = -1,
1135         NODE_NO_CHANGE = 0,
1136         NODE_UP
1137 };
1138
1139 static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
1140                                       unsigned long *orig_bm,
1141                                       unsigned long *cur_bm)
1142 {
1143         unsigned long p1, p2;
1144         int i;
1145
1146         iter->curnode = -1;
1147         iter->orig_bm = orig_bm;
1148         iter->cur_bm = cur_bm;
1149
1150         for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
1151                 p1 = *(iter->orig_bm + i);
1152                 p2 = *(iter->cur_bm + i);
1153                 iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
1154         }
1155 }
1156
1157 static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
1158                                      enum dlm_node_state_change *state)
1159 {
1160         int bit;
1161
1162         if (iter->curnode >= O2NM_MAX_NODES)
1163                 return -ENOENT;
1164
1165         bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
1166                             iter->curnode+1);
1167         if (bit >= O2NM_MAX_NODES) {
1168                 iter->curnode = O2NM_MAX_NODES;
1169                 return -ENOENT;
1170         }
1171
1172         /* if it was there in the original then this node died */
1173         if (test_bit(bit, iter->orig_bm))
1174                 *state = NODE_DOWN;
1175         else
1176                 *state = NODE_UP;
1177
1178         iter->curnode = bit;
1179         return bit;
1180 }
1181
1182
1183 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1184                                     struct dlm_lock_resource *res,
1185                                     struct dlm_master_list_entry *mle,
1186                                     int blocked)
1187 {
1188         struct dlm_bitmap_diff_iter bdi;
1189         enum dlm_node_state_change sc;
1190         int node;
1191         int ret = 0;
1192
1193         mlog(0, "something happened such that the "
1194              "master process may need to be restarted!\n");
1195
1196         assert_spin_locked(&mle->spinlock);
1197
1198         dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
1199         node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1200         while (node >= 0) {
1201                 if (sc == NODE_UP) {
1202                         /* a node came up.  clear any old vote from
1203                          * the response map and set it in the vote map
1204                          * then restart the mastery. */
1205                         mlog(ML_NOTICE, "node %d up while restarting\n", node);
1206
1207                         /* redo the master request, but only for the new node */
1208                         mlog(0, "sending request to new node\n");
1209                         clear_bit(node, mle->response_map);
1210                         set_bit(node, mle->vote_map);
1211                 } else {
1212                         mlog(ML_ERROR, "node down! %d\n", node);
1213
1214                         /* if the node wasn't involved in mastery skip it,
1215                          * but clear it out from the maps so that it will
1216                          * not affect mastery of this lockres */
1217                         clear_bit(node, mle->response_map);
1218                         clear_bit(node, mle->vote_map);
1219                         if (!test_bit(node, mle->maybe_map))
1220                                 goto next;
1221
1222                         /* if we're already blocked on lock mastery, and the
1223                          * dead node wasn't the expected master, or there is
1224                          * another node in the maybe_map, keep waiting */
1225                         if (blocked) {
1226                                 int lowest = find_next_bit(mle->maybe_map,
1227                                                        O2NM_MAX_NODES, 0);
1228
1229                                 /* act like it was never there */
1230                                 clear_bit(node, mle->maybe_map);
1231
1232                                 if (node != lowest)
1233                                         goto next;
1234
1235                                 mlog(ML_ERROR, "expected master %u died while "
1236                                      "this node was blocked waiting on it!\n",
1237                                      node);
1238                                 lowest = find_next_bit(mle->maybe_map,
1239                                                        O2NM_MAX_NODES,
1240                                                        lowest+1);
1241                                 if (lowest < O2NM_MAX_NODES) {
1242                                         mlog(0, "still blocked. waiting "
1243                                              "on %u now\n", lowest);
1244                                         goto next;
1245                                 }
1246
1247                                 /* mle is an MLE_BLOCK, but there is now
1248                                  * nothing left to block on.  we need to return
1249                                  * all the way back out and try again with
1250                                  * an MLE_MASTER. dlm_do_local_recovery_cleanup
1251                                  * has already run, so the mle refcount is ok */
1252                                 mlog(0, "no longer blocking. we can "
1253                                      "try to master this here\n");
1254                                 mle->type = DLM_MLE_MASTER;
1255                                 memset(mle->maybe_map, 0,
1256                                        sizeof(mle->maybe_map));
1257                                 memset(mle->response_map, 0,
1258                                        sizeof(mle->maybe_map));
1259                                 memcpy(mle->vote_map, mle->node_map,
1260                                        sizeof(mle->node_map));
1261                                 mle->u.res = res;
1262                                 set_bit(dlm->node_num, mle->maybe_map);
1263
1264                                 ret = -EAGAIN;
1265                                 goto next;
1266                         }
1267
1268                         clear_bit(node, mle->maybe_map);
1269                         if (node > dlm->node_num)
1270                                 goto next;
1271
1272                         mlog(0, "dead node in map!\n");
1273                         /* yuck. go back and re-contact all nodes
1274                          * in the vote_map, removing this node. */
1275                         memset(mle->response_map, 0,
1276                                sizeof(mle->response_map));
1277                 }
1278                 ret = -EAGAIN;
1279 next:
1280                 node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1281         }
1282         return ret;
1283 }
1284
1285
1286 /*
1287  * DLM_MASTER_REQUEST_MSG
1288  *
1289  * returns: 0 on success,
1290  *          -errno on a network error
1291  *
1292  * on error, the caller should assume the target node is "dead"
1293  *
1294  */
1295
1296 static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to)
1297 {
1298         struct dlm_ctxt *dlm = mle->dlm;
1299         struct dlm_master_request request;
1300         int ret, response=0, resend;
1301
1302         memset(&request, 0, sizeof(request));
1303         request.node_idx = dlm->node_num;
1304
1305         BUG_ON(mle->type == DLM_MLE_MIGRATION);
1306
1307         if (mle->type != DLM_MLE_MASTER) {
1308                 request.namelen = mle->u.name.len;
1309                 memcpy(request.name, mle->u.name.name, request.namelen);
1310         } else {
1311                 request.namelen = mle->u.res->lockname.len;
1312                 memcpy(request.name, mle->u.res->lockname.name,
1313                         request.namelen);
1314         }
1315
1316 again:
1317         ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
1318                                  sizeof(request), to, &response);
1319         if (ret < 0)  {
1320                 if (ret == -ESRCH) {
1321                         /* should never happen */
1322                         mlog(ML_ERROR, "TCP stack not ready!\n");
1323                         BUG();
1324                 } else if (ret == -EINVAL) {
1325                         mlog(ML_ERROR, "bad args passed to o2net!\n");
1326                         BUG();
1327                 } else if (ret == -ENOMEM) {
1328                         mlog(ML_ERROR, "out of memory while trying to send "
1329                              "network message!  retrying\n");
1330                         /* this is totally crude */
1331                         msleep(50);
1332                         goto again;
1333                 } else if (!dlm_is_host_down(ret)) {
1334                         /* not a network error. bad. */
1335                         mlog_errno(ret);
1336                         mlog(ML_ERROR, "unhandled error!");
1337                         BUG();
1338                 }
1339                 /* all other errors should be network errors,
1340                  * and likely indicate node death */
1341                 mlog(ML_ERROR, "link to %d went down!\n", to);
1342                 goto out;
1343         }
1344
1345         ret = 0;
1346         resend = 0;
1347         spin_lock(&mle->spinlock);
1348         switch (response) {
1349                 case DLM_MASTER_RESP_YES:
1350                         set_bit(to, mle->response_map);
1351                         mlog(0, "node %u is the master, response=YES\n", to);
1352                         mle->master = to;
1353                         break;
1354                 case DLM_MASTER_RESP_NO:
1355                         mlog(0, "node %u not master, response=NO\n", to);
1356                         set_bit(to, mle->response_map);
1357                         break;
1358                 case DLM_MASTER_RESP_MAYBE:
1359                         mlog(0, "node %u not master, response=MAYBE\n", to);
1360                         set_bit(to, mle->response_map);
1361                         set_bit(to, mle->maybe_map);
1362                         break;
1363                 case DLM_MASTER_RESP_ERROR:
1364                         mlog(0, "node %u hit an error, resending\n", to);
1365                         resend = 1;
1366                         response = 0;
1367                         break;
1368                 default:
1369                         mlog(ML_ERROR, "bad response! %u\n", response);
1370                         BUG();
1371         }
1372         spin_unlock(&mle->spinlock);
1373         if (resend) {
1374                 /* this is also totally crude */
1375                 msleep(50);
1376                 goto again;
1377         }
1378
1379 out:
1380         return ret;
1381 }
1382
1383 /*
1384  * locks that can be taken here:
1385  * dlm->spinlock
1386  * res->spinlock
1387  * mle->spinlock
1388  * dlm->master_list
1389  *
1390  * if possible, TRIM THIS DOWN!!!
1391  */
1392 int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
1393 {
1394         u8 response = DLM_MASTER_RESP_MAYBE;
1395         struct dlm_ctxt *dlm = data;
1396         struct dlm_lock_resource *res = NULL;
1397         struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
1398         struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
1399         char *name;
1400         unsigned int namelen, hash;
1401         int found, ret;
1402         int set_maybe;
1403         int dispatch_assert = 0;
1404
1405         if (!dlm_grab(dlm))
1406                 return DLM_MASTER_RESP_NO;
1407
1408         if (!dlm_domain_fully_joined(dlm)) {
1409                 response = DLM_MASTER_RESP_NO;
1410                 goto send_response;
1411         }
1412
1413         name = request->name;
1414         namelen = request->namelen;
1415         hash = dlm_lockid_hash(name, namelen);
1416
1417         if (namelen > DLM_LOCKID_NAME_MAX) {
1418                 response = DLM_IVBUFLEN;
1419                 goto send_response;
1420         }
1421
1422 way_up_top:
1423         spin_lock(&dlm->spinlock);
1424         res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1425         if (res) {
1426                 spin_unlock(&dlm->spinlock);
1427
1428                 /* take care of the easy cases up front */
1429                 spin_lock(&res->spinlock);
1430                 if (res->state & DLM_LOCK_RES_RECOVERING) {
1431                         spin_unlock(&res->spinlock);
1432                         mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
1433                              "being recovered\n");
1434                         response = DLM_MASTER_RESP_ERROR;
1435                         if (mle)
1436                                 kmem_cache_free(dlm_mle_cache, mle);
1437                         goto send_response;
1438                 }
1439
1440                 if (res->owner == dlm->node_num) {
1441                         spin_unlock(&res->spinlock);
1442                         // mlog(0, "this node is the master\n");
1443                         response = DLM_MASTER_RESP_YES;
1444                         if (mle)
1445                                 kmem_cache_free(dlm_mle_cache, mle);
1446
1447                         /* this node is the owner.
1448                          * there is some extra work that needs to
1449                          * happen now.  the requesting node has
1450                          * caused all nodes up to this one to
1451                          * create mles.  this node now needs to
1452                          * go back and clean those up. */
1453                         dispatch_assert = 1;
1454                         goto send_response;
1455                 } else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1456                         spin_unlock(&res->spinlock);
1457                         // mlog(0, "node %u is the master\n", res->owner);
1458                         response = DLM_MASTER_RESP_NO;
1459                         if (mle)
1460                                 kmem_cache_free(dlm_mle_cache, mle);
1461                         goto send_response;
1462                 }
1463
1464                 /* ok, there is no owner.  either this node is
1465                  * being blocked, or it is actively trying to
1466                  * master this lock. */
1467                 if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1468                         mlog(ML_ERROR, "lock with no owner should be "
1469                              "in-progress!\n");
1470                         BUG();
1471                 }
1472
1473                 // mlog(0, "lockres is in progress...\n");
1474                 spin_lock(&dlm->master_lock);
1475                 found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1476                 if (!found) {
1477                         mlog(ML_ERROR, "no mle found for this lock!\n");
1478                         BUG();
1479                 }
1480                 set_maybe = 1;
1481                 spin_lock(&tmpmle->spinlock);
1482                 if (tmpmle->type == DLM_MLE_BLOCK) {
1483                         // mlog(0, "this node is waiting for "
1484                         // "lockres to be mastered\n");
1485                         response = DLM_MASTER_RESP_NO;
1486                 } else if (tmpmle->type == DLM_MLE_MIGRATION) {
1487                         mlog(0, "node %u is master, but trying to migrate to "
1488                              "node %u.\n", tmpmle->master, tmpmle->new_master);
1489                         if (tmpmle->master == dlm->node_num) {
1490                                 response = DLM_MASTER_RESP_YES;
1491                                 mlog(ML_ERROR, "no owner on lockres, but this "
1492                                      "node is trying to migrate it to %u?!\n",
1493                                      tmpmle->new_master);
1494                                 BUG();
1495                         } else {
1496                                 /* the real master can respond on its own */
1497                                 response = DLM_MASTER_RESP_NO;
1498                         }
1499                 } else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1500                         set_maybe = 0;
1501                         if (tmpmle->master == dlm->node_num) {
1502                                 response = DLM_MASTER_RESP_YES;
1503                                 /* this node will be the owner.
1504                                  * go back and clean the mles on any
1505                                  * other nodes */
1506                                 dispatch_assert = 1;
1507                         } else
1508                                 response = DLM_MASTER_RESP_NO;
1509                 } else {
1510                         // mlog(0, "this node is attempting to "
1511                         // "master lockres\n");
1512                         response = DLM_MASTER_RESP_MAYBE;
1513                 }
1514                 if (set_maybe)
1515                         set_bit(request->node_idx, tmpmle->maybe_map);
1516                 spin_unlock(&tmpmle->spinlock);
1517
1518                 spin_unlock(&dlm->master_lock);
1519                 spin_unlock(&res->spinlock);
1520
1521                 /* keep the mle attached to heartbeat events */
1522                 dlm_put_mle(tmpmle);
1523                 if (mle)
1524                         kmem_cache_free(dlm_mle_cache, mle);
1525                 goto send_response;
1526         }
1527
1528         /*
1529          * lockres doesn't exist on this node
1530          * if there is an MLE_BLOCK, return NO
1531          * if there is an MLE_MASTER, return MAYBE
1532          * otherwise, add an MLE_BLOCK, return NO
1533          */
1534         spin_lock(&dlm->master_lock);
1535         found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1536         if (!found) {
1537                 /* this lockid has never been seen on this node yet */
1538                 // mlog(0, "no mle found\n");
1539                 if (!mle) {
1540                         spin_unlock(&dlm->master_lock);
1541                         spin_unlock(&dlm->spinlock);
1542
1543                         mle = (struct dlm_master_list_entry *)
1544                                 kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL);
1545                         if (!mle) {
1546                                 response = DLM_MASTER_RESP_ERROR;
1547                                 mlog_errno(-ENOMEM);
1548                                 goto send_response;
1549                         }
1550                         goto way_up_top;
1551                 }
1552
1553                 // mlog(0, "this is second time thru, already allocated, "
1554                 // "add the block.\n");
1555                 dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
1556                 set_bit(request->node_idx, mle->maybe_map);
1557                 list_add(&mle->list, &dlm->master_list);
1558                 response = DLM_MASTER_RESP_NO;
1559         } else {
1560                 // mlog(0, "mle was found\n");
1561                 set_maybe = 1;
1562                 spin_lock(&tmpmle->spinlock);
1563                 if (tmpmle->master == dlm->node_num) {
1564                         mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
1565                         BUG();
1566                 }
1567                 if (tmpmle->type == DLM_MLE_BLOCK)
1568                         response = DLM_MASTER_RESP_NO;
1569                 else if (tmpmle->type == DLM_MLE_MIGRATION) {
1570                         mlog(0, "migration mle was found (%u->%u)\n",
1571                              tmpmle->master, tmpmle->new_master);
1572                         /* real master can respond on its own */
1573                         response = DLM_MASTER_RESP_NO;
1574                 } else
1575                         response = DLM_MASTER_RESP_MAYBE;
1576                 if (set_maybe)
1577                         set_bit(request->node_idx, tmpmle->maybe_map);
1578                 spin_unlock(&tmpmle->spinlock);
1579         }
1580         spin_unlock(&dlm->master_lock);
1581         spin_unlock(&dlm->spinlock);
1582
1583         if (found) {
1584                 /* keep the mle attached to heartbeat events */
1585                 dlm_put_mle(tmpmle);
1586         }
1587 send_response:
1588
1589         if (dispatch_assert) {
1590                 if (response != DLM_MASTER_RESP_YES)
1591                         mlog(ML_ERROR, "invalid response %d\n", response);
1592                 if (!res) {
1593                         mlog(ML_ERROR, "bad lockres while trying to assert!\n");
1594                         BUG();
1595                 }
1596                 mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
1597                              dlm->node_num, res->lockname.len, res->lockname.name);
1598                 ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx, 
1599                                                  DLM_ASSERT_MASTER_MLE_CLEANUP);
1600                 if (ret < 0) {
1601                         mlog(ML_ERROR, "failed to dispatch assert master work\n");
1602                         response = DLM_MASTER_RESP_ERROR;
1603                 }
1604         }
1605
1606         dlm_put(dlm);
1607         return response;
1608 }
1609
1610 /*
1611  * DLM_ASSERT_MASTER_MSG
1612  */
1613
1614
1615 /*
1616  * NOTE: this can be used for debugging
1617  * can periodically run all locks owned by this node
1618  * and re-assert across the cluster...
1619  */
1620 static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
1621                                 unsigned int namelen, void *nodemap,
1622                                 u32 flags)
1623 {
1624         struct dlm_assert_master assert;
1625         int to, tmpret;
1626         struct dlm_node_iter iter;
1627         int ret = 0;
1628         int reassert;
1629
1630         BUG_ON(namelen > O2NM_MAX_NAME_LEN);
1631 again:
1632         reassert = 0;
1633
1634         /* note that if this nodemap is empty, it returns 0 */
1635         dlm_node_iter_init(nodemap, &iter);
1636         while ((to = dlm_node_iter_next(&iter)) >= 0) {
1637                 int r = 0;
1638                 struct dlm_master_list_entry *mle = NULL;
1639
1640                 mlog(0, "sending assert master to %d (%.*s)\n", to,
1641                      namelen, lockname);
1642                 memset(&assert, 0, sizeof(assert));
1643                 assert.node_idx = dlm->node_num;
1644                 assert.namelen = namelen;
1645                 memcpy(assert.name, lockname, namelen);
1646                 assert.flags = cpu_to_be32(flags);
1647
1648                 tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
1649                                             &assert, sizeof(assert), to, &r);
1650                 if (tmpret < 0) {
1651                         mlog(ML_ERROR, "assert_master returned %d!\n", tmpret);
1652                         if (!dlm_is_host_down(tmpret)) {
1653                                 mlog(ML_ERROR, "unhandled error!\n");
1654                                 BUG();
1655                         }
1656                         /* a node died.  finish out the rest of the nodes. */
1657                         mlog(ML_ERROR, "link to %d went down!\n", to);
1658                         /* any nonzero status return will do */
1659                         ret = tmpret;
1660                 } else if (r < 0) {
1661                         /* ok, something horribly messed.  kill thyself. */
1662                         mlog(ML_ERROR,"during assert master of %.*s to %u, "
1663                              "got %d.\n", namelen, lockname, to, r);
1664                         spin_lock(&dlm->spinlock);
1665                         spin_lock(&dlm->master_lock);
1666                         if (dlm_find_mle(dlm, &mle, (char *)lockname,
1667                                          namelen)) {
1668                                 dlm_print_one_mle(mle);
1669                                 __dlm_put_mle(mle);
1670                         }
1671                         spin_unlock(&dlm->master_lock);
1672                         spin_unlock(&dlm->spinlock);
1673                         BUG();
1674                 } else if (r == EAGAIN) {
1675                         mlog(0, "%.*s: node %u create mles on other "
1676                              "nodes and requests a re-assert\n", 
1677                              namelen, lockname, to);
1678                         reassert = 1;
1679                 }
1680         }
1681
1682         if (reassert)
1683                 goto again;
1684
1685         return ret;
1686 }
1687
1688 /*
1689  * locks that can be taken here:
1690  * dlm->spinlock
1691  * res->spinlock
1692  * mle->spinlock
1693  * dlm->master_list
1694  *
1695  * if possible, TRIM THIS DOWN!!!
1696  */
1697 int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
1698 {
1699         struct dlm_ctxt *dlm = data;
1700         struct dlm_master_list_entry *mle = NULL;
1701         struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
1702         struct dlm_lock_resource *res = NULL;
1703         char *name;
1704         unsigned int namelen, hash;
1705         u32 flags;
1706         int master_request = 0;
1707         int ret = 0;
1708
1709         if (!dlm_grab(dlm))
1710                 return 0;
1711
1712         name = assert->name;
1713         namelen = assert->namelen;
1714         hash = dlm_lockid_hash(name, namelen);
1715         flags = be32_to_cpu(assert->flags);
1716
1717         if (namelen > DLM_LOCKID_NAME_MAX) {
1718                 mlog(ML_ERROR, "Invalid name length!");
1719                 goto done;
1720         }
1721
1722         spin_lock(&dlm->spinlock);
1723
1724         if (flags)
1725                 mlog(0, "assert_master with flags: %u\n", flags);
1726
1727         /* find the MLE */
1728         spin_lock(&dlm->master_lock);
1729         if (!dlm_find_mle(dlm, &mle, name, namelen)) {
1730                 /* not an error, could be master just re-asserting */
1731                 mlog(0, "just got an assert_master from %u, but no "
1732                      "MLE for it! (%.*s)\n", assert->node_idx,
1733                      namelen, name);
1734         } else {
1735                 int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0);
1736                 if (bit >= O2NM_MAX_NODES) {
1737                         /* not necessarily an error, though less likely.
1738                          * could be master just re-asserting. */
1739                         mlog(0, "no bits set in the maybe_map, but %u "
1740                              "is asserting! (%.*s)\n", assert->node_idx,
1741                              namelen, name);
1742                 } else if (bit != assert->node_idx) {
1743                         if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1744                                 mlog(0, "master %u was found, %u should "
1745                                      "back off\n", assert->node_idx, bit);
1746                         } else {
1747                                 /* with the fix for bug 569, a higher node
1748                                  * number winning the mastery will respond
1749                                  * YES to mastery requests, but this node
1750                                  * had no way of knowing.  let it pass. */
1751                                 mlog(0, "%u is the lowest node, "
1752                                      "%u is asserting. (%.*s)  %u must "
1753                                      "have begun after %u won.\n", bit,
1754                                      assert->node_idx, namelen, name, bit,
1755                                      assert->node_idx);
1756                         }
1757                 }
1758                 if (mle->type == DLM_MLE_MIGRATION) {
1759                         if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1760                                 mlog(0, "%s:%.*s: got cleanup assert"
1761                                      " from %u for migration\n",
1762                                      dlm->name, namelen, name,
1763                                      assert->node_idx);
1764                         } else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
1765                                 mlog(0, "%s:%.*s: got unrelated assert"
1766                                      " from %u for migration, ignoring\n",
1767                                      dlm->name, namelen, name,
1768                                      assert->node_idx);
1769                                 __dlm_put_mle(mle);
1770                                 spin_unlock(&dlm->master_lock);
1771                                 spin_unlock(&dlm->spinlock);
1772                                 goto done;
1773                         }       
1774                 }
1775         }
1776         spin_unlock(&dlm->master_lock);
1777
1778         /* ok everything checks out with the MLE
1779          * now check to see if there is a lockres */
1780         res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1781         if (res) {
1782                 spin_lock(&res->spinlock);
1783                 if (res->state & DLM_LOCK_RES_RECOVERING)  {
1784                         mlog(ML_ERROR, "%u asserting but %.*s is "
1785                              "RECOVERING!\n", assert->node_idx, namelen, name);
1786                         goto kill;
1787                 }
1788                 if (!mle) {
1789                         if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
1790                             res->owner != assert->node_idx) {
1791                                 mlog(ML_ERROR, "assert_master from "
1792                                           "%u, but current owner is "
1793                                           "%u! (%.*s)\n",
1794                                        assert->node_idx, res->owner,
1795                                        namelen, name);
1796                                 goto kill;
1797                         }
1798                 } else if (mle->type != DLM_MLE_MIGRATION) {
1799                         if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1800                                 /* owner is just re-asserting */
1801                                 if (res->owner == assert->node_idx) {
1802                                         mlog(0, "owner %u re-asserting on "
1803                                              "lock %.*s\n", assert->node_idx,
1804                                              namelen, name);
1805                                         goto ok;
1806                                 }
1807                                 mlog(ML_ERROR, "got assert_master from "
1808                                      "node %u, but %u is the owner! "
1809                                      "(%.*s)\n", assert->node_idx,
1810                                      res->owner, namelen, name);
1811                                 goto kill;
1812                         }
1813                         if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1814                                 mlog(ML_ERROR, "got assert from %u, but lock "
1815                                      "with no owner should be "
1816                                      "in-progress! (%.*s)\n",
1817                                      assert->node_idx,
1818                                      namelen, name);
1819                                 goto kill;
1820                         }
1821                 } else /* mle->type == DLM_MLE_MIGRATION */ {
1822                         /* should only be getting an assert from new master */
1823                         if (assert->node_idx != mle->new_master) {
1824                                 mlog(ML_ERROR, "got assert from %u, but "
1825                                      "new master is %u, and old master "
1826                                      "was %u (%.*s)\n",
1827                                      assert->node_idx, mle->new_master,
1828                                      mle->master, namelen, name);
1829                                 goto kill;
1830                         }
1831
1832                 }
1833 ok:
1834                 spin_unlock(&res->spinlock);
1835         }
1836         spin_unlock(&dlm->spinlock);
1837
1838         // mlog(0, "woo!  got an assert_master from node %u!\n",
1839         //           assert->node_idx);
1840         if (mle) {
1841                 int extra_ref = 0;
1842                 int nn = -1;
1843                 int rr, err = 0;
1844                 
1845                 spin_lock(&mle->spinlock);
1846                 if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
1847                         extra_ref = 1;
1848                 else {
1849                         /* MASTER mle: if any bits set in the response map
1850                          * then the calling node needs to re-assert to clear
1851                          * up nodes that this node contacted */
1852                         while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES, 
1853                                                     nn+1)) < O2NM_MAX_NODES) {
1854                                 if (nn != dlm->node_num && nn != assert->node_idx)
1855                                         master_request = 1;
1856                         }
1857                 }
1858                 mle->master = assert->node_idx;
1859                 atomic_set(&mle->woken, 1);
1860                 wake_up(&mle->wq);
1861                 spin_unlock(&mle->spinlock);
1862
1863                 if (res) {
1864                         spin_lock(&res->spinlock);
1865                         if (mle->type == DLM_MLE_MIGRATION) {
1866                                 mlog(0, "finishing off migration of lockres %.*s, "
1867                                         "from %u to %u\n",
1868                                         res->lockname.len, res->lockname.name,
1869                                         dlm->node_num, mle->new_master);
1870                                 res->state &= ~DLM_LOCK_RES_MIGRATING;
1871                                 dlm_change_lockres_owner(dlm, res, mle->new_master);
1872                                 BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
1873                         } else {
1874                                 dlm_change_lockres_owner(dlm, res, mle->master);
1875                         }
1876                         spin_unlock(&res->spinlock);
1877                 }
1878
1879                 /* master is known, detach if not already detached.
1880                  * ensures that only one assert_master call will happen
1881                  * on this mle. */
1882                 spin_lock(&dlm->spinlock);
1883                 spin_lock(&dlm->master_lock);
1884
1885                 rr = atomic_read(&mle->mle_refs.refcount);
1886                 if (mle->inuse > 0) {
1887                         if (extra_ref && rr < 3)
1888                                 err = 1;
1889                         else if (!extra_ref && rr < 2)
1890                                 err = 1;
1891                 } else {
1892                         if (extra_ref && rr < 2)
1893                                 err = 1;
1894                         else if (!extra_ref && rr < 1)
1895                                 err = 1;
1896                 }
1897                 if (err) {
1898                         mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
1899                              "that will mess up this node, refs=%d, extra=%d, "
1900                              "inuse=%d\n", dlm->name, namelen, name,
1901                              assert->node_idx, rr, extra_ref, mle->inuse);
1902                         dlm_print_one_mle(mle);
1903                 }
1904                 list_del_init(&mle->list);
1905                 __dlm_mle_detach_hb_events(dlm, mle);
1906                 __dlm_put_mle(mle);
1907                 if (extra_ref) {
1908                         /* the assert master message now balances the extra
1909                          * ref given by the master / migration request message.
1910                          * if this is the last put, it will be removed
1911                          * from the list. */
1912                         __dlm_put_mle(mle);
1913                 }
1914                 spin_unlock(&dlm->master_lock);
1915                 spin_unlock(&dlm->spinlock);
1916         } else if (res) {
1917                 if (res->owner != assert->node_idx) {
1918                         mlog(0, "assert_master from %u, but current "
1919                              "owner is %u (%.*s), no mle\n", assert->node_idx,
1920                              res->owner, namelen, name);
1921                 }
1922         }
1923
1924 done:
1925         ret = 0;
1926         if (res)
1927                 dlm_lockres_put(res);
1928         dlm_put(dlm);
1929         if (master_request) {
1930                 mlog(0, "need to tell master to reassert\n");
1931                 ret = EAGAIN;  // positive. negative would shoot down the node.
1932         }
1933         return ret;
1934
1935 kill:
1936         /* kill the caller! */
1937         mlog(ML_ERROR, "Bad message received from another node.  Dumping state "
1938              "and killing the other node now!  This node is OK and can continue.\n");
1939         __dlm_print_one_lock_resource(res);
1940         spin_unlock(&res->spinlock);
1941         spin_unlock(&dlm->spinlock);
1942         dlm_lockres_put(res);
1943         dlm_put(dlm);
1944         return -EINVAL;
1945 }
1946
1947 int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
1948                                struct dlm_lock_resource *res,
1949                                int ignore_higher, u8 request_from, u32 flags)
1950 {
1951         struct dlm_work_item *item;
1952         item = kcalloc(1, sizeof(*item), GFP_KERNEL);
1953         if (!item)
1954                 return -ENOMEM;
1955
1956
1957         /* queue up work for dlm_assert_master_worker */
1958         dlm_grab(dlm);  /* get an extra ref for the work item */
1959         dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
1960         item->u.am.lockres = res; /* already have a ref */
1961         /* can optionally ignore node numbers higher than this node */
1962         item->u.am.ignore_higher = ignore_higher;
1963         item->u.am.request_from = request_from;
1964         item->u.am.flags = flags;
1965
1966         if (ignore_higher) 
1967                 mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len, 
1968                      res->lockname.name);
1969                 
1970         spin_lock(&dlm->work_lock);
1971         list_add_tail(&item->list, &dlm->work_list);
1972         spin_unlock(&dlm->work_lock);
1973
1974         schedule_work(&dlm->dispatched_work);
1975         return 0;
1976 }
1977
1978 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
1979 {
1980         struct dlm_ctxt *dlm = data;
1981         int ret = 0;
1982         struct dlm_lock_resource *res;
1983         unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];
1984         int ignore_higher;
1985         int bit;
1986         u8 request_from;
1987         u32 flags;
1988
1989         dlm = item->dlm;
1990         res = item->u.am.lockres;
1991         ignore_higher = item->u.am.ignore_higher;
1992         request_from = item->u.am.request_from;
1993         flags = item->u.am.flags;
1994
1995         spin_lock(&dlm->spinlock);
1996         memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
1997         spin_unlock(&dlm->spinlock);
1998
1999         clear_bit(dlm->node_num, nodemap);
2000         if (ignore_higher) {
2001                 /* if is this just to clear up mles for nodes below
2002                  * this node, do not send the message to the original
2003                  * caller or any node number higher than this */
2004                 clear_bit(request_from, nodemap);
2005                 bit = dlm->node_num;
2006                 while (1) {
2007                         bit = find_next_bit(nodemap, O2NM_MAX_NODES,
2008                                             bit+1);
2009                         if (bit >= O2NM_MAX_NODES)
2010                                 break;
2011                         clear_bit(bit, nodemap);
2012                 }
2013         }
2014
2015         /* this call now finishes out the nodemap
2016          * even if one or more nodes die */
2017         mlog(0, "worker about to master %.*s here, this=%u\n",
2018                      res->lockname.len, res->lockname.name, dlm->node_num);
2019         ret = dlm_do_assert_master(dlm, res->lockname.name,
2020                                    res->lockname.len,
2021                                    nodemap, flags);
2022         if (ret < 0) {
2023                 /* no need to restart, we are done */
2024                 mlog_errno(ret);
2025         }
2026
2027         dlm_lockres_put(res);
2028
2029         mlog(0, "finished with dlm_assert_master_worker\n");
2030 }
2031
2032 /* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
2033  * We cannot wait for node recovery to complete to begin mastering this
2034  * lockres because this lockres is used to kick off recovery! ;-)
2035  * So, do a pre-check on all living nodes to see if any of those nodes
2036  * think that $RECOVERY is currently mastered by a dead node.  If so,
2037  * we wait a short time to allow that node to get notified by its own
2038  * heartbeat stack, then check again.  All $RECOVERY lock resources
2039  * mastered by dead nodes are purged when the hearbeat callback is 
2040  * fired, so we can know for sure that it is safe to continue once
2041  * the node returns a live node or no node.  */
2042 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
2043                                        struct dlm_lock_resource *res)
2044 {
2045         struct dlm_node_iter iter;
2046         int nodenum;
2047         int ret = 0;
2048         u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
2049
2050         spin_lock(&dlm->spinlock);
2051         dlm_node_iter_init(dlm->domain_map, &iter);
2052         spin_unlock(&dlm->spinlock);
2053
2054         while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
2055                 /* do not send to self */
2056                 if (nodenum == dlm->node_num)
2057                         continue;
2058                 ret = dlm_do_master_requery(dlm, res, nodenum, &master);
2059                 if (ret < 0) {
2060                         mlog_errno(ret);
2061                         if (!dlm_is_host_down(ret))
2062                                 BUG();
2063                         /* host is down, so answer for that node would be
2064                          * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
2065                 }
2066
2067                 if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
2068                         /* check to see if this master is in the recovery map */
2069                         spin_lock(&dlm->spinlock);
2070                         if (test_bit(master, dlm->recovery_map)) {
2071                                 mlog(ML_NOTICE, "%s: node %u has not seen "
2072                                      "node %u go down yet, and thinks the "
2073                                      "dead node is mastering the recovery "
2074                                      "lock.  must wait.\n", dlm->name,
2075                                      nodenum, master);
2076                                 ret = -EAGAIN;
2077                         }
2078                         spin_unlock(&dlm->spinlock);
2079                         mlog(0, "%s: reco lock master is %u\n", dlm->name, 
2080                              master);
2081                         break;
2082                 }
2083         }
2084         return ret;
2085 }
2086
2087
2088 /*
2089  * DLM_MIGRATE_LOCKRES
2090  */
2091
2092
2093 int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
2094                         u8 target)
2095 {
2096         struct dlm_master_list_entry *mle = NULL;
2097         struct dlm_master_list_entry *oldmle = NULL;
2098         struct dlm_migratable_lockres *mres = NULL;
2099         int ret = -EINVAL;
2100         const char *name;
2101         unsigned int namelen;
2102         int mle_added = 0;
2103         struct list_head *queue, *iter;
2104         int i;
2105         struct dlm_lock *lock;
2106         int empty = 1;
2107
2108         if (!dlm_grab(dlm))
2109                 return -EINVAL;
2110
2111         name = res->lockname.name;
2112         namelen = res->lockname.len;
2113
2114         mlog(0, "migrating %.*s to %u\n", namelen, name, target);
2115
2116         /*
2117          * ensure this lockres is a proper candidate for migration
2118          */
2119         spin_lock(&res->spinlock);
2120         if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
2121                 mlog(0, "cannot migrate lockres with unknown owner!\n");
2122                 spin_unlock(&res->spinlock);
2123                 goto leave;
2124         }
2125         if (res->owner != dlm->node_num) {
2126                 mlog(0, "cannot migrate lockres this node doesn't own!\n");
2127                 spin_unlock(&res->spinlock);
2128                 goto leave;
2129         }
2130         mlog(0, "checking queues...\n");
2131         queue = &res->granted;
2132         for (i=0; i<3; i++) {
2133                 list_for_each(iter, queue) {
2134                         lock = list_entry (iter, struct dlm_lock, list);
2135                         empty = 0;
2136                         if (lock->ml.node == dlm->node_num) {
2137                                 mlog(0, "found a lock owned by this node "
2138                                      "still on the %s queue!  will not "
2139                                      "migrate this lockres\n",
2140                                      i==0 ? "granted" :
2141                                      (i==1 ? "converting" : "blocked"));
2142                                 spin_unlock(&res->spinlock);
2143                                 ret = -ENOTEMPTY;
2144                                 goto leave;
2145                         }
2146                 }
2147                 queue++;
2148         }
2149         mlog(0, "all locks on this lockres are nonlocal.  continuing\n");
2150         spin_unlock(&res->spinlock);
2151
2152         /* no work to do */
2153         if (empty) {
2154                 mlog(0, "no locks were found on this lockres! done!\n");
2155                 ret = 0;
2156                 goto leave;
2157         }
2158
2159         /*
2160          * preallocate up front
2161          * if this fails, abort
2162          */
2163
2164         ret = -ENOMEM;
2165         mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_KERNEL);
2166         if (!mres) {
2167                 mlog_errno(ret);
2168                 goto leave;
2169         }
2170
2171         mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
2172                                                                 GFP_KERNEL);
2173         if (!mle) {
2174                 mlog_errno(ret);
2175                 goto leave;
2176         }
2177         ret = 0;
2178
2179         /*
2180          * find a node to migrate the lockres to
2181          */
2182
2183         mlog(0, "picking a migration node\n");
2184         spin_lock(&dlm->spinlock);
2185         /* pick a new node */
2186         if (!test_bit(target, dlm->domain_map) ||
2187             target >= O2NM_MAX_NODES) {
2188                 target = dlm_pick_migration_target(dlm, res);
2189         }
2190         mlog(0, "node %u chosen for migration\n", target);
2191
2192         if (target >= O2NM_MAX_NODES ||
2193             !test_bit(target, dlm->domain_map)) {
2194                 /* target chosen is not alive */
2195                 ret = -EINVAL;
2196         }
2197
2198         if (ret) {
2199                 spin_unlock(&dlm->spinlock);
2200                 goto fail;
2201         }
2202
2203         mlog(0, "continuing with target = %u\n", target);
2204
2205         /*
2206          * clear any existing master requests and
2207          * add the migration mle to the list
2208          */
2209         spin_lock(&dlm->master_lock);
2210         ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
2211                                     namelen, target, dlm->node_num);
2212         spin_unlock(&dlm->master_lock);
2213         spin_unlock(&dlm->spinlock);
2214
2215         if (ret == -EEXIST) {
2216                 mlog(0, "another process is already migrating it\n");
2217                 goto fail;
2218         }
2219         mle_added = 1;
2220
2221         /*
2222          * set the MIGRATING flag and flush asts
2223          * if we fail after this we need to re-dirty the lockres
2224          */
2225         if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
2226                 mlog(ML_ERROR, "tried to migrate %.*s to %u, but "
2227                      "the target went down.\n", res->lockname.len,
2228                      res->lockname.name, target);
2229                 spin_lock(&res->spinlock);
2230                 res->state &= ~DLM_LOCK_RES_MIGRATING;
2231                 spin_unlock(&res->spinlock);
2232                 ret = -EINVAL;
2233         }
2234
2235 fail:
2236         if (oldmle) {
2237                 /* master is known, detach if not already detached */
2238                 dlm_mle_detach_hb_events(dlm, oldmle);
2239                 dlm_put_mle(oldmle);
2240         }
2241
2242         if (ret < 0) {
2243                 if (mle_added) {
2244                         dlm_mle_detach_hb_events(dlm, mle);
2245                         dlm_put_mle(mle);
2246                 } else if (mle) {
2247                         kmem_cache_free(dlm_mle_cache, mle);
2248                 }
2249                 goto leave;
2250         }
2251
2252         /*
2253          * at this point, we have a migration target, an mle
2254          * in the master list, and the MIGRATING flag set on
2255          * the lockres
2256          */
2257
2258
2259         /* get an extra reference on the mle.
2260          * otherwise the assert_master from the new
2261          * master will destroy this.
2262          * also, make sure that all callers of dlm_get_mle
2263          * take both dlm->spinlock and dlm->master_lock */
2264         spin_lock(&dlm->spinlock);
2265         spin_lock(&dlm->master_lock);
2266         dlm_get_mle_inuse(mle);
2267         spin_unlock(&dlm->master_lock);
2268         spin_unlock(&dlm->spinlock);
2269
2270         /* notify new node and send all lock state */
2271         /* call send_one_lockres with migration flag.
2272          * this serves as notice to the target node that a
2273          * migration is starting. */
2274         ret = dlm_send_one_lockres(dlm, res, mres, target,
2275                                    DLM_MRES_MIGRATION);
2276
2277         if (ret < 0) {
2278                 mlog(0, "migration to node %u failed with %d\n",
2279                      target, ret);
2280                 /* migration failed, detach and clean up mle */
2281                 dlm_mle_detach_hb_events(dlm, mle);
2282                 dlm_put_mle(mle);
2283                 dlm_put_mle_inuse(mle);
2284                 spin_lock(&res->spinlock);
2285                 res->state &= ~DLM_LOCK_RES_MIGRATING;
2286                 spin_unlock(&res->spinlock);
2287                 goto leave;
2288         }
2289
2290         /* at this point, the target sends a message to all nodes,
2291          * (using dlm_do_migrate_request).  this node is skipped since
2292          * we had to put an mle in the list to begin the process.  this
2293          * node now waits for target to do an assert master.  this node
2294          * will be the last one notified, ensuring that the migration
2295          * is complete everywhere.  if the target dies while this is
2296          * going on, some nodes could potentially see the target as the
2297          * master, so it is important that my recovery finds the migration
2298          * mle and sets the master to UNKNONWN. */
2299
2300
2301         /* wait for new node to assert master */
2302         while (1) {
2303                 ret = wait_event_interruptible_timeout(mle->wq,
2304                                         (atomic_read(&mle->woken) == 1),
2305                                         msecs_to_jiffies(5000));
2306
2307                 if (ret >= 0) {
2308                         if (atomic_read(&mle->woken) == 1 ||
2309                             res->owner == target)
2310                                 break;
2311
2312                         mlog(0, "timed out during migration\n");
2313                         /* avoid hang during shutdown when migrating lockres 
2314                          * to a node which also goes down */
2315                         if (dlm_is_node_dead(dlm, target)) {
2316                                 mlog(0, "%s:%.*s: expected migration "
2317                                      "target %u is no longer up, restarting\n",
2318                                      dlm->name, res->lockname.len,
2319                                      res->lockname.name, target);
2320                                 ret = -ERESTARTSYS;
2321                         }
2322                 }
2323                 if (ret == -ERESTARTSYS) {
2324                         /* migration failed, detach and clean up mle */
2325                         dlm_mle_detach_hb_events(dlm, mle);
2326                         dlm_put_mle(mle);
2327                         dlm_put_mle_inuse(mle);
2328                         spin_lock(&res->spinlock);
2329                         res->state &= ~DLM_LOCK_RES_MIGRATING;
2330                         spin_unlock(&res->spinlock);
2331                         goto leave;
2332                 }
2333                 /* TODO: if node died: stop, clean up, return error */
2334         }
2335
2336         /* all done, set the owner, clear the flag */
2337         spin_lock(&res->spinlock);
2338         dlm_set_lockres_owner(dlm, res, target);
2339         res->state &= ~DLM_LOCK_RES_MIGRATING;
2340         dlm_remove_nonlocal_locks(dlm, res);
2341         spin_unlock(&res->spinlock);
2342         wake_up(&res->wq);
2343
2344         /* master is known, detach if not already detached */
2345         dlm_mle_detach_hb_events(dlm, mle);
2346         dlm_put_mle_inuse(mle);
2347         ret = 0;
2348
2349         dlm_lockres_calc_usage(dlm, res);
2350
2351 leave:
2352         /* re-dirty the lockres if we failed */
2353         if (ret < 0)
2354                 dlm_kick_thread(dlm, res);
2355
2356         /* TODO: cleanup */
2357         if (mres)
2358                 free_page((unsigned long)mres);
2359
2360         dlm_put(dlm);
2361
2362         mlog(0, "returning %d\n", ret);
2363         return ret;
2364 }
2365 EXPORT_SYMBOL_GPL(dlm_migrate_lockres);
2366
2367 int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
2368 {
2369         int ret;
2370         spin_lock(&dlm->ast_lock);
2371         spin_lock(&lock->spinlock);
2372         ret = (list_empty(&lock->bast_list) && !lock->bast_pending);
2373         spin_unlock(&lock->spinlock);
2374         spin_unlock(&dlm->ast_lock);
2375         return ret;
2376 }
2377
2378 static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
2379                                      struct dlm_lock_resource *res,
2380                                      u8 mig_target)
2381 {
2382         int can_proceed;
2383         spin_lock(&res->spinlock);
2384         can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
2385         spin_unlock(&res->spinlock);
2386
2387         /* target has died, so make the caller break out of the 
2388          * wait_event, but caller must recheck the domain_map */
2389         spin_lock(&dlm->spinlock);
2390         if (!test_bit(mig_target, dlm->domain_map))
2391                 can_proceed = 1;
2392         spin_unlock(&dlm->spinlock);
2393         return can_proceed;
2394 }
2395
2396 int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2397 {
2398         int ret;
2399         spin_lock(&res->spinlock);
2400         ret = !!(res->state & DLM_LOCK_RES_DIRTY);
2401         spin_unlock(&res->spinlock);
2402         return ret;
2403 }
2404
2405
2406 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
2407                                        struct dlm_lock_resource *res,
2408                                        u8 target)
2409 {
2410         int ret = 0;
2411
2412         mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",
2413                res->lockname.len, res->lockname.name, dlm->node_num,
2414                target);
2415         /* need to set MIGRATING flag on lockres.  this is done by
2416          * ensuring that all asts have been flushed for this lockres. */
2417         spin_lock(&res->spinlock);
2418         BUG_ON(res->migration_pending);
2419         res->migration_pending = 1;
2420         /* strategy is to reserve an extra ast then release
2421          * it below, letting the release do all of the work */
2422         __dlm_lockres_reserve_ast(res);
2423         spin_unlock(&res->spinlock);
2424
2425         /* now flush all the pending asts.. hang out for a bit */
2426         dlm_kick_thread(dlm, res);
2427         wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
2428         dlm_lockres_release_ast(dlm, res);
2429
2430         mlog(0, "about to wait on migration_wq, dirty=%s\n",
2431                res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
2432         /* if the extra ref we just put was the final one, this
2433          * will pass thru immediately.  otherwise, we need to wait
2434          * for the last ast to finish. */
2435 again:
2436         ret = wait_event_interruptible_timeout(dlm->migration_wq,
2437                    dlm_migration_can_proceed(dlm, res, target),
2438                    msecs_to_jiffies(1000));
2439         if (ret < 0) {
2440                 mlog(0, "woken again: migrating? %s, dead? %s\n",
2441                        res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2442                        test_bit(target, dlm->domain_map) ? "no":"yes");
2443         } else {
2444                 mlog(0, "all is well: migrating? %s, dead? %s\n",
2445                        res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2446                        test_bit(target, dlm->domain_map) ? "no":"yes");
2447         }
2448         if (!dlm_migration_can_proceed(dlm, res, target)) {
2449                 mlog(0, "trying again...\n");
2450                 goto again;
2451         }
2452
2453         /* did the target go down or die? */
2454         spin_lock(&dlm->spinlock);
2455         if (!test_bit(target, dlm->domain_map)) {
2456                 mlog(ML_ERROR, "aha. migration target %u just went down\n",
2457                      target);
2458                 ret = -EHOSTDOWN;
2459         }
2460         spin_unlock(&dlm->spinlock);
2461
2462         /*
2463          * at this point:
2464          *
2465          *   o the DLM_LOCK_RES_MIGRATING flag is set
2466          *   o there are no pending asts on this lockres
2467          *   o all processes trying to reserve an ast on this
2468          *     lockres must wait for the MIGRATING flag to clear
2469          */
2470         return ret;
2471 }
2472
2473 /* last step in the migration process.
2474  * original master calls this to free all of the dlm_lock
2475  * structures that used to be for other nodes. */
2476 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2477                                       struct dlm_lock_resource *res)
2478 {
2479         struct list_head *iter, *iter2;
2480         struct list_head *queue = &res->granted;
2481         int i;
2482         struct dlm_lock *lock;
2483
2484         assert_spin_locked(&res->spinlock);
2485
2486         BUG_ON(res->owner == dlm->node_num);
2487
2488         for (i=0; i<3; i++) {
2489                 list_for_each_safe(iter, iter2, queue) {
2490                         lock = list_entry (iter, struct dlm_lock, list);
2491                         if (lock->ml.node != dlm->node_num) {
2492                                 mlog(0, "putting lock for node %u\n",
2493                                      lock->ml.node);
2494                                 /* be extra careful */
2495                                 BUG_ON(!list_empty(&lock->ast_list));
2496                                 BUG_ON(!list_empty(&lock->bast_list));
2497                                 BUG_ON(lock->ast_pending);
2498                                 BUG_ON(lock->bast_pending);
2499                                 list_del_init(&lock->list);
2500                                 dlm_lock_put(lock);
2501                         }
2502                 }
2503                 queue++;
2504         }
2505 }
2506
2507 /* for now this is not too intelligent.  we will
2508  * need stats to make this do the right thing.
2509  * this just finds the first lock on one of the
2510  * queues and uses that node as the target. */
2511 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
2512                                     struct dlm_lock_resource *res)
2513 {
2514         int i;
2515         struct list_head *queue = &res->granted;
2516         struct list_head *iter;
2517         struct dlm_lock *lock;
2518         int nodenum;
2519
2520         assert_spin_locked(&dlm->spinlock);
2521
2522         spin_lock(&res->spinlock);
2523         for (i=0; i<3; i++) {
2524                 list_for_each(iter, queue) {
2525                         /* up to the caller to make sure this node
2526                          * is alive */
2527                         lock = list_entry (iter, struct dlm_lock, list);
2528                         if (lock->ml.node != dlm->node_num) {
2529                                 spin_unlock(&res->spinlock);
2530                                 return lock->ml.node;
2531                         }
2532                 }
2533                 queue++;
2534         }
2535         spin_unlock(&res->spinlock);
2536         mlog(0, "have not found a suitable target yet! checking domain map\n");
2537
2538         /* ok now we're getting desperate.  pick anyone alive. */
2539         nodenum = -1;
2540         while (1) {
2541                 nodenum = find_next_bit(dlm->domain_map,
2542                                         O2NM_MAX_NODES, nodenum+1);
2543                 mlog(0, "found %d in domain map\n", nodenum);
2544                 if (nodenum >= O2NM_MAX_NODES)
2545                         break;
2546                 if (nodenum != dlm->node_num) {
2547                         mlog(0, "picking %d\n", nodenum);
2548                         return nodenum;
2549                 }
2550         }
2551
2552         mlog(0, "giving up.  no master to migrate to\n");
2553         return DLM_LOCK_RES_OWNER_UNKNOWN;
2554 }
2555
2556
2557
2558 /* this is called by the new master once all lockres
2559  * data has been received */
2560 static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
2561                                   struct dlm_lock_resource *res,
2562                                   u8 master, u8 new_master,
2563                                   struct dlm_node_iter *iter)
2564 {
2565         struct dlm_migrate_request migrate;
2566         int ret, status = 0;
2567         int nodenum;
2568
2569         memset(&migrate, 0, sizeof(migrate));
2570         migrate.namelen = res->lockname.len;
2571         memcpy(migrate.name, res->lockname.name, migrate.namelen);
2572         migrate.new_master = new_master;
2573         migrate.master = master;
2574
2575         ret = 0;
2576
2577         /* send message to all nodes, except the master and myself */
2578         while ((nodenum = dlm_node_iter_next(iter)) >= 0) {
2579                 if (nodenum == master ||
2580                     nodenum == new_master)
2581                         continue;
2582
2583                 ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
2584                                          &migrate, sizeof(migrate), nodenum,
2585                                          &status);
2586                 if (ret < 0)
2587                         mlog_errno(ret);
2588                 else if (status < 0) {
2589                         mlog(0, "migrate request (node %u) returned %d!\n",
2590                              nodenum, status);
2591                         ret = status;
2592                 }
2593         }
2594
2595         if (ret < 0)
2596                 mlog_errno(ret);
2597
2598         mlog(0, "returning ret=%d\n", ret);
2599         return ret;
2600 }
2601
2602
2603 /* if there is an existing mle for this lockres, we now know who the master is.
2604  * (the one who sent us *this* message) we can clear it up right away.
2605  * since the process that put the mle on the list still has a reference to it,
2606  * we can unhash it now, set the master and wake the process.  as a result,
2607  * we will have no mle in the list to start with.  now we can add an mle for
2608  * the migration and this should be the only one found for those scanning the
2609  * list.  */
2610 int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data)
2611 {
2612         struct dlm_ctxt *dlm = data;
2613         struct dlm_lock_resource *res = NULL;
2614         struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
2615         struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
2616         const char *name;
2617         unsigned int namelen, hash;
2618         int ret = 0;
2619
2620         if (!dlm_grab(dlm))
2621                 return -EINVAL;
2622
2623         name = migrate->name;
2624         namelen = migrate->namelen;
2625         hash = dlm_lockid_hash(name, namelen);
2626
2627         /* preallocate.. if this fails, abort */
2628         mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
2629                                                          GFP_KERNEL);
2630
2631         if (!mle) {
2632                 ret = -ENOMEM;
2633                 goto leave;
2634         }
2635
2636         /* check for pre-existing lock */
2637         spin_lock(&dlm->spinlock);
2638         res = __dlm_lookup_lockres(dlm, name, namelen, hash);
2639         spin_lock(&dlm->master_lock);
2640
2641         if (res) {
2642                 spin_lock(&res->spinlock);
2643                 if (res->state & DLM_LOCK_RES_RECOVERING) {
2644                         /* if all is working ok, this can only mean that we got
2645                         * a migrate request from a node that we now see as
2646                         * dead.  what can we do here?  drop it to the floor? */
2647                         spin_unlock(&res->spinlock);
2648                         mlog(ML_ERROR, "Got a migrate request, but the "
2649                              "lockres is marked as recovering!");
2650                         kmem_cache_free(dlm_mle_cache, mle);
2651                         ret = -EINVAL; /* need a better solution */
2652                         goto unlock;
2653                 }
2654                 res->state |= DLM_LOCK_RES_MIGRATING;
2655                 spin_unlock(&res->spinlock);
2656         }
2657
2658         /* ignore status.  only nonzero status would BUG. */
2659         ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
2660                                     name, namelen,
2661                                     migrate->new_master,
2662                                     migrate->master);
2663
2664 unlock:
2665         spin_unlock(&dlm->master_lock);
2666         spin_unlock(&dlm->spinlock);
2667
2668         if (oldmle) {
2669                 /* master is known, detach if not already detached */
2670                 dlm_mle_detach_hb_events(dlm, oldmle);
2671                 dlm_put_mle(oldmle);
2672         }
2673
2674         if (res)
2675                 dlm_lockres_put(res);
2676 leave:
2677         dlm_put(dlm);
2678         return ret;
2679 }
2680
2681 /* must be holding dlm->spinlock and dlm->master_lock
2682  * when adding a migration mle, we can clear any other mles
2683  * in the master list because we know with certainty that
2684  * the master is "master".  so we remove any old mle from
2685  * the list after setting it's master field, and then add
2686  * the new migration mle.  this way we can hold with the rule
2687  * of having only one mle for a given lock name at all times. */
2688 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
2689                                  struct dlm_lock_resource *res,
2690                                  struct dlm_master_list_entry *mle,
2691                                  struct dlm_master_list_entry **oldmle,
2692                                  const char *name, unsigned int namelen,
2693                                  u8 new_master, u8 master)
2694 {
2695         int found;
2696         int ret = 0;
2697
2698         *oldmle = NULL;
2699
2700         mlog_entry_void();
2701
2702         assert_spin_locked(&dlm->spinlock);
2703         assert_spin_locked(&dlm->master_lock);
2704
2705         /* caller is responsible for any ref taken here on oldmle */
2706         found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
2707         if (found) {
2708                 struct dlm_master_list_entry *tmp = *oldmle;
2709                 spin_lock(&tmp->spinlock);
2710                 if (tmp->type == DLM_MLE_MIGRATION) {
2711                         if (master == dlm->node_num) {
2712                                 /* ah another process raced me to it */
2713                                 mlog(0, "tried to migrate %.*s, but some "
2714                                      "process beat me to it\n",
2715                                      namelen, name);
2716                                 ret = -EEXIST;
2717                         } else {
2718                                 /* bad.  2 NODES are trying to migrate! */
2719                                 mlog(ML_ERROR, "migration error  mle: "
2720                                      "master=%u new_master=%u // request: "
2721                                      "master=%u new_master=%u // "
2722                                      "lockres=%.*s\n",
2723                                      tmp->master, tmp->new_master,
2724                                      master, new_master,
2725                                      namelen, name);
2726                                 BUG();
2727                         }
2728                 } else {
2729                         /* this is essentially what assert_master does */
2730                         tmp->master = master;
2731                         atomic_set(&tmp->woken, 1);
2732                         wake_up(&tmp->wq);
2733                         /* remove it from the list so that only one
2734                          * mle will be found */
2735                         list_del_init(&tmp->list);
2736                         __dlm_mle_detach_hb_events(dlm, mle);
2737                 }
2738                 spin_unlock(&tmp->spinlock);
2739         }
2740
2741         /* now add a migration mle to the tail of the list */
2742         dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
2743         mle->new_master = new_master;
2744         mle->master = master;
2745         /* do this for consistency with other mle types */
2746         set_bit(new_master, mle->maybe_map);
2747         list_add(&mle->list, &dlm->master_list);
2748
2749         return ret;
2750 }
2751
2752
2753 void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
2754 {
2755         struct list_head *iter, *iter2;
2756         struct dlm_master_list_entry *mle;
2757         struct dlm_lock_resource *res;
2758         unsigned int hash;
2759
2760         mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node);
2761 top:
2762         assert_spin_locked(&dlm->spinlock);
2763
2764         /* clean the master list */
2765         spin_lock(&dlm->master_lock);
2766         list_for_each_safe(iter, iter2, &dlm->master_list) {
2767                 mle = list_entry(iter, struct dlm_master_list_entry, list);
2768
2769                 BUG_ON(mle->type != DLM_MLE_BLOCK &&
2770                        mle->type != DLM_MLE_MASTER &&
2771                        mle->type != DLM_MLE_MIGRATION);
2772
2773                 /* MASTER mles are initiated locally.  the waiting
2774                  * process will notice the node map change
2775                  * shortly.  let that happen as normal. */
2776                 if (mle->type == DLM_MLE_MASTER)
2777                         continue;
2778
2779
2780                 /* BLOCK mles are initiated by other nodes.
2781                  * need to clean up if the dead node would have
2782                  * been the master. */
2783                 if (mle->type == DLM_MLE_BLOCK) {
2784                         int bit;
2785
2786                         spin_lock(&mle->spinlock);
2787                         bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
2788                         if (bit != dead_node) {
2789                                 mlog(0, "mle found, but dead node %u would "
2790                                      "not have been master\n", dead_node);
2791                                 spin_unlock(&mle->spinlock);
2792                         } else {
2793                                 /* must drop the refcount by one since the
2794                                  * assert_master will never arrive.  this
2795                                  * may result in the mle being unlinked and
2796                                  * freed, but there may still be a process
2797                                  * waiting in the dlmlock path which is fine. */
2798                                 mlog(ML_ERROR, "node %u was expected master\n",
2799                                      dead_node);
2800                                 atomic_set(&mle->woken, 1);
2801                                 spin_unlock(&mle->spinlock);
2802                                 wake_up(&mle->wq);
2803                                 /* do not need events any longer, so detach 
2804                                  * from heartbeat */
2805                                 __dlm_mle_detach_hb_events(dlm, mle);
2806                                 __dlm_put_mle(mle);
2807                         }
2808                         continue;
2809                 }
2810
2811                 /* everything else is a MIGRATION mle */
2812
2813                 /* the rule for MIGRATION mles is that the master
2814                  * becomes UNKNOWN if *either* the original or
2815                  * the new master dies.  all UNKNOWN lockreses
2816                  * are sent to whichever node becomes the recovery
2817                  * master.  the new master is responsible for
2818                  * determining if there is still a master for
2819                  * this lockres, or if he needs to take over
2820                  * mastery.  either way, this node should expect
2821                  * another message to resolve this. */
2822                 if (mle->master != dead_node &&
2823                     mle->new_master != dead_node)
2824                         continue;
2825
2826                 /* if we have reached this point, this mle needs to
2827                  * be removed from the list and freed. */
2828
2829                 /* remove from the list early.  NOTE: unlinking
2830                  * list_head while in list_for_each_safe */
2831                 __dlm_mle_detach_hb_events(dlm, mle);
2832                 spin_lock(&mle->spinlock);
2833                 list_del_init(&mle->list);
2834                 atomic_set(&mle->woken, 1);
2835                 spin_unlock(&mle->spinlock);
2836                 wake_up(&mle->wq);
2837
2838                 mlog(0, "%s: node %u died during migration from "
2839                      "%u to %u!\n", dlm->name, dead_node,
2840                      mle->master, mle->new_master);
2841                 /* if there is a lockres associated with this
2842                  * mle, find it and set its owner to UNKNOWN */
2843                 hash = dlm_lockid_hash(mle->u.name.name, mle->u.name.len);
2844                 res = __dlm_lookup_lockres(dlm, mle->u.name.name,
2845                                            mle->u.name.len, hash);
2846                 if (res) {
2847                         /* unfortunately if we hit this rare case, our
2848                          * lock ordering is messed.  we need to drop
2849                          * the master lock so that we can take the
2850                          * lockres lock, meaning that we will have to
2851                          * restart from the head of list. */
2852                         spin_unlock(&dlm->master_lock);
2853
2854                         /* move lockres onto recovery list */
2855                         spin_lock(&res->spinlock);
2856                         dlm_set_lockres_owner(dlm, res,
2857                                         DLM_LOCK_RES_OWNER_UNKNOWN);
2858                         dlm_move_lockres_to_recovery_list(dlm, res);
2859                         spin_unlock(&res->spinlock);
2860                         dlm_lockres_put(res);
2861
2862                         /* about to get rid of mle, detach from heartbeat */
2863                         __dlm_mle_detach_hb_events(dlm, mle);
2864
2865                         /* dump the mle */
2866                         spin_lock(&dlm->master_lock);
2867                         __dlm_put_mle(mle);
2868                         spin_unlock(&dlm->master_lock);
2869
2870                         /* restart */
2871                         goto top;
2872                 }
2873
2874                 /* this may be the last reference */
2875                 __dlm_put_mle(mle);
2876         }
2877         spin_unlock(&dlm->master_lock);
2878 }
2879
2880
2881 int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
2882                          u8 old_master)
2883 {
2884         struct dlm_node_iter iter;
2885         int ret = 0;
2886
2887         spin_lock(&dlm->spinlock);
2888         dlm_node_iter_init(dlm->domain_map, &iter);
2889         clear_bit(old_master, iter.node_map);
2890         clear_bit(dlm->node_num, iter.node_map);
2891         spin_unlock(&dlm->spinlock);
2892
2893         mlog(0, "now time to do a migrate request to other nodes\n");
2894         ret = dlm_do_migrate_request(dlm, res, old_master,
2895                                      dlm->node_num, &iter);
2896         if (ret < 0) {
2897                 mlog_errno(ret);
2898                 goto leave;
2899         }
2900
2901         mlog(0, "doing assert master of %.*s to all except the original node\n",
2902              res->lockname.len, res->lockname.name);
2903         /* this call now finishes out the nodemap
2904          * even if one or more nodes die */
2905         ret = dlm_do_assert_master(dlm, res->lockname.name,
2906                                    res->lockname.len, iter.node_map,
2907                                    DLM_ASSERT_MASTER_FINISH_MIGRATION);
2908         if (ret < 0) {
2909                 /* no longer need to retry.  all living nodes contacted. */
2910                 mlog_errno(ret);
2911                 ret = 0;
2912         }
2913
2914         memset(iter.node_map, 0, sizeof(iter.node_map));
2915         set_bit(old_master, iter.node_map);
2916         mlog(0, "doing assert master of %.*s back to %u\n",
2917              res->lockname.len, res->lockname.name, old_master);
2918         ret = dlm_do_assert_master(dlm, res->lockname.name,
2919                                    res->lockname.len, iter.node_map,
2920                                    DLM_ASSERT_MASTER_FINISH_MIGRATION);
2921         if (ret < 0) {
2922                 mlog(0, "assert master to original master failed "
2923                      "with %d.\n", ret);
2924                 /* the only nonzero status here would be because of
2925                  * a dead original node.  we're done. */
2926                 ret = 0;
2927         }
2928
2929         /* all done, set the owner, clear the flag */
2930         spin_lock(&res->spinlock);
2931         dlm_set_lockres_owner(dlm, res, dlm->node_num);
2932         res->state &= ~DLM_LOCK_RES_MIGRATING;
2933         spin_unlock(&res->spinlock);
2934         /* re-dirty it on the new master */
2935         dlm_kick_thread(dlm, res);
2936         wake_up(&res->wq);
2937 leave:
2938         return ret;
2939 }
2940
2941 /*
2942  * LOCKRES AST REFCOUNT
2943  * this is integral to migration
2944  */
2945
2946 /* for future intent to call an ast, reserve one ahead of time.
2947  * this should be called only after waiting on the lockres
2948  * with dlm_wait_on_lockres, and while still holding the
2949  * spinlock after the call. */
2950 void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res)
2951 {
2952         assert_spin_locked(&res->spinlock);
2953         if (res->state & DLM_LOCK_RES_MIGRATING) {
2954                 __dlm_print_one_lock_resource(res);
2955         }
2956         BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
2957
2958         atomic_inc(&res->asts_reserved);
2959 }
2960
2961 /*
2962  * used to drop the reserved ast, either because it went unused,
2963  * or because the ast/bast was actually called.
2964  *
2965  * also, if there is a pending migration on this lockres,
2966  * and this was the last pending ast on the lockres,
2967  * atomically set the MIGRATING flag before we drop the lock.
2968  * this is how we ensure that migration can proceed with no
2969  * asts in progress.  note that it is ok if the state of the
2970  * queues is such that a lock should be granted in the future
2971  * or that a bast should be fired, because the new master will
2972  * shuffle the lists on this lockres as soon as it is migrated.
2973  */
2974 void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
2975                              struct dlm_lock_resource *res)
2976 {
2977         if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
2978                 return;
2979
2980         if (!res->migration_pending) {
2981                 spin_unlock(&res->spinlock);
2982                 return;
2983         }
2984
2985         BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
2986         res->migration_pending = 0;
2987         res->state |= DLM_LOCK_RES_MIGRATING;
2988         spin_unlock(&res->spinlock);
2989         wake_up(&res->wq);
2990         wake_up(&dlm->migration_wq);
2991 }