]> err.no Git - linux-2.6/blob - fs/ocfs2/dlm/dlmmaster.c
[PATCH] OCFS2: The Second Oracle Cluster Filesystem
[linux-2.6] / fs / ocfs2 / dlm / dlmmaster.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmmod.c
5  *
6  * standalone DLM module
7  *
8  * Copyright (C) 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  *
25  */
26
27
28 #include <linux/module.h>
29 #include <linux/fs.h>
30 #include <linux/types.h>
31 #include <linux/slab.h>
32 #include <linux/highmem.h>
33 #include <linux/utsname.h>
34 #include <linux/init.h>
35 #include <linux/sysctl.h>
36 #include <linux/random.h>
37 #include <linux/blkdev.h>
38 #include <linux/socket.h>
39 #include <linux/inet.h>
40 #include <linux/spinlock.h>
41 #include <linux/delay.h>
42
43
44 #include "cluster/heartbeat.h"
45 #include "cluster/nodemanager.h"
46 #include "cluster/tcp.h"
47
48 #include "dlmapi.h"
49 #include "dlmcommon.h"
50 #include "dlmdebug.h"
51
52 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
53 #include "cluster/masklog.h"
54
55 enum dlm_mle_type {
56         DLM_MLE_BLOCK,
57         DLM_MLE_MASTER,
58         DLM_MLE_MIGRATION
59 };
60
61 struct dlm_lock_name
62 {
63         u8 len;
64         u8 name[DLM_LOCKID_NAME_MAX];
65 };
66
67 struct dlm_master_list_entry
68 {
69         struct list_head list;
70         struct list_head hb_events;
71         struct dlm_ctxt *dlm;
72         spinlock_t spinlock;
73         wait_queue_head_t wq;
74         atomic_t woken;
75         struct kref mle_refs;
76         unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
77         unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
78         unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
79         unsigned long node_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
80         u8 master;
81         u8 new_master;
82         enum dlm_mle_type type;
83         struct o2hb_callback_func mle_hb_up;
84         struct o2hb_callback_func mle_hb_down;
85         union {
86                 struct dlm_lock_resource *res;
87                 struct dlm_lock_name name;
88         } u;
89 };
90
91 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
92                               struct dlm_master_list_entry *mle,
93                               struct o2nm_node *node,
94                               int idx);
95 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
96                             struct dlm_master_list_entry *mle,
97                             struct o2nm_node *node,
98                             int idx);
99
100 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
101 static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
102                                 unsigned int namelen, void *nodemap,
103                                 u32 flags);
104
105 static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
106                                 struct dlm_master_list_entry *mle,
107                                 const char *name,
108                                 unsigned int namelen)
109 {
110         struct dlm_lock_resource *res;
111
112         if (dlm != mle->dlm)
113                 return 0;
114
115         if (mle->type == DLM_MLE_BLOCK ||
116             mle->type == DLM_MLE_MIGRATION) {
117                 if (namelen != mle->u.name.len ||
118                     memcmp(name, mle->u.name.name, namelen)!=0)
119                         return 0;
120         } else {
121                 res = mle->u.res;
122                 if (namelen != res->lockname.len ||
123                     memcmp(res->lockname.name, name, namelen) != 0)
124                         return 0;
125         }
126         return 1;
127 }
128
129 #if 0
130 /* Code here is included but defined out as it aids debugging */
131
132 void dlm_print_one_mle(struct dlm_master_list_entry *mle)
133 {
134         int i = 0, refs;
135         char *type;
136         char attached;
137         u8 master;
138         unsigned int namelen;
139         const char *name;
140         struct kref *k;
141
142         k = &mle->mle_refs;
143         if (mle->type == DLM_MLE_BLOCK)
144                 type = "BLK";
145         else if (mle->type == DLM_MLE_MASTER)
146                 type = "MAS";
147         else
148                 type = "MIG";
149         refs = atomic_read(&k->refcount);
150         master = mle->master;
151         attached = (list_empty(&mle->hb_events) ? 'N' : 'Y');
152
153         if (mle->type != DLM_MLE_MASTER) {
154                 namelen = mle->u.name.len;
155                 name = mle->u.name.name;
156         } else {
157                 namelen = mle->u.res->lockname.len;
158                 name = mle->u.res->lockname.name;
159         }
160
161         mlog(ML_NOTICE, "  #%3d: %3s  %3d  %3u   %3u %c    (%d)%.*s\n",
162                   i, type, refs, master, mle->new_master, attached,
163                   namelen, namelen, name);
164 }
165
166 static void dlm_dump_mles(struct dlm_ctxt *dlm)
167 {
168         struct dlm_master_list_entry *mle;
169         struct list_head *iter;
170         
171         mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name);
172         mlog(ML_NOTICE, "  ####: type refs owner new events? lockname nodemap votemap respmap maybemap\n");
173         spin_lock(&dlm->master_lock);
174         list_for_each(iter, &dlm->master_list) {
175                 mle = list_entry(iter, struct dlm_master_list_entry, list);
176                 dlm_print_one_mle(mle);
177         }
178         spin_unlock(&dlm->master_lock);
179 }
180
181 extern spinlock_t dlm_domain_lock;
182 extern struct list_head dlm_domains;
183
184 int dlm_dump_all_mles(const char __user *data, unsigned int len)
185 {
186         struct list_head *iter;
187         struct dlm_ctxt *dlm;
188
189         spin_lock(&dlm_domain_lock);
190         list_for_each(iter, &dlm_domains) {
191                 dlm = list_entry (iter, struct dlm_ctxt, list);
192                 mlog(ML_NOTICE, "found dlm: %p, name=%s\n", dlm, dlm->name);
193                 dlm_dump_mles(dlm);
194         }
195         spin_unlock(&dlm_domain_lock);
196         return len;
197 }
198 EXPORT_SYMBOL_GPL(dlm_dump_all_mles);
199
200 #endif  /*  0  */
201
202
203 static kmem_cache_t *dlm_mle_cache = NULL;
204
205
206 static void dlm_mle_release(struct kref *kref);
207 static void dlm_init_mle(struct dlm_master_list_entry *mle,
208                         enum dlm_mle_type type,
209                         struct dlm_ctxt *dlm,
210                         struct dlm_lock_resource *res,
211                         const char *name,
212                         unsigned int namelen);
213 static void dlm_put_mle(struct dlm_master_list_entry *mle);
214 static void __dlm_put_mle(struct dlm_master_list_entry *mle);
215 static int dlm_find_mle(struct dlm_ctxt *dlm,
216                         struct dlm_master_list_entry **mle,
217                         char *name, unsigned int namelen);
218
219 static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to);
220
221
222 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
223                                      struct dlm_lock_resource *res,
224                                      struct dlm_master_list_entry *mle,
225                                      int *blocked);
226 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
227                                     struct dlm_lock_resource *res,
228                                     struct dlm_master_list_entry *mle,
229                                     int blocked);
230 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
231                                  struct dlm_lock_resource *res,
232                                  struct dlm_master_list_entry *mle,
233                                  struct dlm_master_list_entry **oldmle,
234                                  const char *name, unsigned int namelen,
235                                  u8 new_master, u8 master);
236
237 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
238                                     struct dlm_lock_resource *res);
239 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
240                                       struct dlm_lock_resource *res);
241 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
242                                        struct dlm_lock_resource *res,
243                                        u8 target);
244
245
246 int dlm_is_host_down(int errno)
247 {
248         switch (errno) {
249                 case -EBADF:
250                 case -ECONNREFUSED:
251                 case -ENOTCONN:
252                 case -ECONNRESET:
253                 case -EPIPE:
254                 case -EHOSTDOWN:
255                 case -EHOSTUNREACH:
256                 case -ETIMEDOUT:
257                 case -ECONNABORTED:
258                 case -ENETDOWN:
259                 case -ENETUNREACH:
260                 case -ENETRESET:
261                 case -ESHUTDOWN:
262                 case -ENOPROTOOPT:
263                 case -EINVAL:   /* if returned from our tcp code,
264                                    this means there is no socket */
265                         return 1;
266         }
267         return 0;
268 }
269
270
271 /*
272  * MASTER LIST FUNCTIONS
273  */
274
275
276 /*
277  * regarding master list entries and heartbeat callbacks:
278  *
279  * in order to avoid sleeping and allocation that occurs in
280  * heartbeat, master list entries are simply attached to the
281  * dlm's established heartbeat callbacks.  the mle is attached
282  * when it is created, and since the dlm->spinlock is held at
283  * that time, any heartbeat event will be properly discovered
284  * by the mle.  the mle needs to be detached from the
285  * dlm->mle_hb_events list as soon as heartbeat events are no
286  * longer useful to the mle, and before the mle is freed.
287  *
288  * as a general rule, heartbeat events are no longer needed by
289  * the mle once an "answer" regarding the lock master has been
290  * received.
291  */
292 static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
293                                               struct dlm_master_list_entry *mle)
294 {
295         assert_spin_locked(&dlm->spinlock);
296
297         list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
298 }
299
300
301 static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
302                                               struct dlm_master_list_entry *mle)
303 {
304         if (!list_empty(&mle->hb_events))
305                 list_del_init(&mle->hb_events);
306 }
307
308
309 static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
310                                             struct dlm_master_list_entry *mle)
311 {
312         spin_lock(&dlm->spinlock);
313         __dlm_mle_detach_hb_events(dlm, mle);
314         spin_unlock(&dlm->spinlock);
315 }
316
317 /* remove from list and free */
318 static void __dlm_put_mle(struct dlm_master_list_entry *mle)
319 {
320         struct dlm_ctxt *dlm;
321         dlm = mle->dlm;
322
323         assert_spin_locked(&dlm->spinlock);
324         assert_spin_locked(&dlm->master_lock);
325         BUG_ON(!atomic_read(&mle->mle_refs.refcount));
326
327         kref_put(&mle->mle_refs, dlm_mle_release);
328 }
329
330
331 /* must not have any spinlocks coming in */
332 static void dlm_put_mle(struct dlm_master_list_entry *mle)
333 {
334         struct dlm_ctxt *dlm;
335         dlm = mle->dlm;
336
337         spin_lock(&dlm->spinlock);
338         spin_lock(&dlm->master_lock);
339         __dlm_put_mle(mle);
340         spin_unlock(&dlm->master_lock);
341         spin_unlock(&dlm->spinlock);
342 }
343
344 static inline void dlm_get_mle(struct dlm_master_list_entry *mle)
345 {
346         kref_get(&mle->mle_refs);
347 }
348
349 static void dlm_init_mle(struct dlm_master_list_entry *mle,
350                         enum dlm_mle_type type,
351                         struct dlm_ctxt *dlm,
352                         struct dlm_lock_resource *res,
353                         const char *name,
354                         unsigned int namelen)
355 {
356         assert_spin_locked(&dlm->spinlock);
357
358         mle->dlm = dlm;
359         mle->type = type;
360         INIT_LIST_HEAD(&mle->list);
361         INIT_LIST_HEAD(&mle->hb_events);
362         memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
363         spin_lock_init(&mle->spinlock);
364         init_waitqueue_head(&mle->wq);
365         atomic_set(&mle->woken, 0);
366         kref_init(&mle->mle_refs);
367         memset(mle->response_map, 0, sizeof(mle->response_map));
368         mle->master = O2NM_MAX_NODES;
369         mle->new_master = O2NM_MAX_NODES;
370
371         if (mle->type == DLM_MLE_MASTER) {
372                 BUG_ON(!res);
373                 mle->u.res = res;
374         } else if (mle->type == DLM_MLE_BLOCK) {
375                 BUG_ON(!name);
376                 memcpy(mle->u.name.name, name, namelen);
377                 mle->u.name.len = namelen;
378         } else /* DLM_MLE_MIGRATION */ {
379                 BUG_ON(!name);
380                 memcpy(mle->u.name.name, name, namelen);
381                 mle->u.name.len = namelen;
382         }
383
384         /* copy off the node_map and register hb callbacks on our copy */
385         memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
386         memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
387         clear_bit(dlm->node_num, mle->vote_map);
388         clear_bit(dlm->node_num, mle->node_map);
389
390         /* attach the mle to the domain node up/down events */
391         __dlm_mle_attach_hb_events(dlm, mle);
392 }
393
394
395 /* returns 1 if found, 0 if not */
396 static int dlm_find_mle(struct dlm_ctxt *dlm,
397                         struct dlm_master_list_entry **mle,
398                         char *name, unsigned int namelen)
399 {
400         struct dlm_master_list_entry *tmpmle;
401         struct list_head *iter;
402
403         assert_spin_locked(&dlm->master_lock);
404
405         list_for_each(iter, &dlm->master_list) {
406                 tmpmle = list_entry(iter, struct dlm_master_list_entry, list);
407                 if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
408                         continue;
409                 dlm_get_mle(tmpmle);
410                 *mle = tmpmle;
411                 return 1;
412         }
413         return 0;
414 }
415
416 void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
417 {
418         struct dlm_master_list_entry *mle;
419         struct list_head *iter;
420
421         assert_spin_locked(&dlm->spinlock);
422         
423         list_for_each(iter, &dlm->mle_hb_events) {
424                 mle = list_entry(iter, struct dlm_master_list_entry, 
425                                  hb_events);
426                 if (node_up)
427                         dlm_mle_node_up(dlm, mle, NULL, idx);
428                 else
429                         dlm_mle_node_down(dlm, mle, NULL, idx);
430         }
431 }
432
433 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
434                               struct dlm_master_list_entry *mle,
435                               struct o2nm_node *node, int idx)
436 {
437         spin_lock(&mle->spinlock);
438
439         if (!test_bit(idx, mle->node_map))
440                 mlog(0, "node %u already removed from nodemap!\n", idx);
441         else
442                 clear_bit(idx, mle->node_map);
443
444         spin_unlock(&mle->spinlock);
445 }
446
447 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
448                             struct dlm_master_list_entry *mle,
449                             struct o2nm_node *node, int idx)
450 {
451         spin_lock(&mle->spinlock);
452
453         if (test_bit(idx, mle->node_map))
454                 mlog(0, "node %u already in node map!\n", idx);
455         else
456                 set_bit(idx, mle->node_map);
457
458         spin_unlock(&mle->spinlock);
459 }
460
461
462 int dlm_init_mle_cache(void)
463 {
464         dlm_mle_cache = kmem_cache_create("dlm_mle_cache",
465                                           sizeof(struct dlm_master_list_entry),
466                                           0, SLAB_HWCACHE_ALIGN,
467                                           NULL, NULL);
468         if (dlm_mle_cache == NULL)
469                 return -ENOMEM;
470         return 0;
471 }
472
473 void dlm_destroy_mle_cache(void)
474 {
475         if (dlm_mle_cache)
476                 kmem_cache_destroy(dlm_mle_cache);
477 }
478
479 static void dlm_mle_release(struct kref *kref)
480 {
481         struct dlm_master_list_entry *mle;
482         struct dlm_ctxt *dlm;
483
484         mlog_entry_void();
485
486         mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
487         dlm = mle->dlm;
488
489         if (mle->type != DLM_MLE_MASTER) {
490                 mlog(0, "calling mle_release for %.*s, type %d\n",
491                      mle->u.name.len, mle->u.name.name, mle->type);
492         } else {
493                 mlog(0, "calling mle_release for %.*s, type %d\n",
494                      mle->u.res->lockname.len,
495                      mle->u.res->lockname.name, mle->type);
496         }
497         assert_spin_locked(&dlm->spinlock);
498         assert_spin_locked(&dlm->master_lock);
499
500         /* remove from list if not already */
501         if (!list_empty(&mle->list))
502                 list_del_init(&mle->list);
503
504         /* detach the mle from the domain node up/down events */
505         __dlm_mle_detach_hb_events(dlm, mle);
506
507         /* NOTE: kfree under spinlock here.
508          * if this is bad, we can move this to a freelist. */
509         kmem_cache_free(dlm_mle_cache, mle);
510 }
511
512
513 /*
514  * LOCK RESOURCE FUNCTIONS
515  */
516
517 static void dlm_set_lockres_owner(struct dlm_ctxt *dlm,
518                                   struct dlm_lock_resource *res,
519                                   u8 owner)
520 {
521         assert_spin_locked(&res->spinlock);
522
523         mlog_entry("%.*s, %u\n", res->lockname.len, res->lockname.name, owner);
524
525         if (owner == dlm->node_num)
526                 atomic_inc(&dlm->local_resources);
527         else if (owner == DLM_LOCK_RES_OWNER_UNKNOWN)
528                 atomic_inc(&dlm->unknown_resources);
529         else
530                 atomic_inc(&dlm->remote_resources);
531
532         res->owner = owner;
533 }
534
535 void dlm_change_lockres_owner(struct dlm_ctxt *dlm,
536                               struct dlm_lock_resource *res, u8 owner)
537 {
538         assert_spin_locked(&res->spinlock);
539
540         if (owner == res->owner)
541                 return;
542
543         if (res->owner == dlm->node_num)
544                 atomic_dec(&dlm->local_resources);
545         else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN)
546                 atomic_dec(&dlm->unknown_resources);
547         else
548                 atomic_dec(&dlm->remote_resources);
549
550         dlm_set_lockres_owner(dlm, res, owner);
551 }
552
553
554 static void dlm_lockres_release(struct kref *kref)
555 {
556         struct dlm_lock_resource *res;
557
558         res = container_of(kref, struct dlm_lock_resource, refs);
559
560         /* This should not happen -- all lockres' have a name
561          * associated with them at init time. */
562         BUG_ON(!res->lockname.name);
563
564         mlog(0, "destroying lockres %.*s\n", res->lockname.len,
565              res->lockname.name);
566
567         /* By the time we're ready to blow this guy away, we shouldn't
568          * be on any lists. */
569         BUG_ON(!list_empty(&res->list));
570         BUG_ON(!list_empty(&res->granted));
571         BUG_ON(!list_empty(&res->converting));
572         BUG_ON(!list_empty(&res->blocked));
573         BUG_ON(!list_empty(&res->dirty));
574         BUG_ON(!list_empty(&res->recovering));
575         BUG_ON(!list_empty(&res->purge));
576
577         kfree(res->lockname.name);
578
579         kfree(res);
580 }
581
582 void dlm_lockres_get(struct dlm_lock_resource *res)
583 {
584         kref_get(&res->refs);
585 }
586
587 void dlm_lockres_put(struct dlm_lock_resource *res)
588 {
589         kref_put(&res->refs, dlm_lockres_release);
590 }
591
592 static void dlm_init_lockres(struct dlm_ctxt *dlm,
593                              struct dlm_lock_resource *res,
594                              const char *name, unsigned int namelen)
595 {
596         char *qname;
597
598         /* If we memset here, we lose our reference to the kmalloc'd
599          * res->lockname.name, so be sure to init every field
600          * correctly! */
601
602         qname = (char *) res->lockname.name;
603         memcpy(qname, name, namelen);
604
605         res->lockname.len = namelen;
606         res->lockname.hash = full_name_hash(name, namelen);
607
608         init_waitqueue_head(&res->wq);
609         spin_lock_init(&res->spinlock);
610         INIT_LIST_HEAD(&res->list);
611         INIT_LIST_HEAD(&res->granted);
612         INIT_LIST_HEAD(&res->converting);
613         INIT_LIST_HEAD(&res->blocked);
614         INIT_LIST_HEAD(&res->dirty);
615         INIT_LIST_HEAD(&res->recovering);
616         INIT_LIST_HEAD(&res->purge);
617         atomic_set(&res->asts_reserved, 0);
618         res->migration_pending = 0;
619
620         kref_init(&res->refs);
621
622         /* just for consistency */
623         spin_lock(&res->spinlock);
624         dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
625         spin_unlock(&res->spinlock);
626
627         res->state = DLM_LOCK_RES_IN_PROGRESS;
628
629         res->last_used = 0;
630
631         memset(res->lvb, 0, DLM_LVB_LEN);
632 }
633
634 struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
635                                    const char *name,
636                                    unsigned int namelen)
637 {
638         struct dlm_lock_resource *res;
639
640         res = kmalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
641         if (!res)
642                 return NULL;
643
644         res->lockname.name = kmalloc(namelen, GFP_KERNEL);
645         if (!res->lockname.name) {
646                 kfree(res);
647                 return NULL;
648         }
649
650         dlm_init_lockres(dlm, res, name, namelen);
651         return res;
652 }
653
654 /*
655  * lookup a lock resource by name.
656  * may already exist in the hashtable.
657  * lockid is null terminated
658  *
659  * if not, allocate enough for the lockres and for
660  * the temporary structure used in doing the mastering.
661  *
662  * also, do a lookup in the dlm->master_list to see
663  * if another node has begun mastering the same lock.
664  * if so, there should be a block entry in there
665  * for this name, and we should *not* attempt to master
666  * the lock here.   need to wait around for that node
667  * to assert_master (or die).
668  *
669  */
670 struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
671                                           const char *lockid,
672                                           int flags)
673 {
674         struct dlm_lock_resource *tmpres=NULL, *res=NULL;
675         struct dlm_master_list_entry *mle = NULL;
676         struct dlm_master_list_entry *alloc_mle = NULL;
677         int blocked = 0;
678         int ret, nodenum;
679         struct dlm_node_iter iter;
680         unsigned int namelen;
681         int tries = 0;
682
683         BUG_ON(!lockid);
684
685         namelen = strlen(lockid);
686
687         mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
688
689 lookup:
690         spin_lock(&dlm->spinlock);
691         tmpres = __dlm_lookup_lockres(dlm, lockid, namelen);
692         if (tmpres) {
693                 spin_unlock(&dlm->spinlock);
694                 mlog(0, "found in hash!\n");
695                 if (res)
696                         dlm_lockres_put(res);
697                 res = tmpres;
698                 goto leave;
699         }
700
701         if (!res) {
702                 spin_unlock(&dlm->spinlock);
703                 mlog(0, "allocating a new resource\n");
704                 /* nothing found and we need to allocate one. */
705                 alloc_mle = (struct dlm_master_list_entry *)
706                         kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL);
707                 if (!alloc_mle)
708                         goto leave;
709                 res = dlm_new_lockres(dlm, lockid, namelen);
710                 if (!res)
711                         goto leave;
712                 goto lookup;
713         }
714
715         mlog(0, "no lockres found, allocated our own: %p\n", res);
716
717         if (flags & LKM_LOCAL) {
718                 /* caller knows it's safe to assume it's not mastered elsewhere
719                  * DONE!  return right away */
720                 spin_lock(&res->spinlock);
721                 dlm_change_lockres_owner(dlm, res, dlm->node_num);
722                 __dlm_insert_lockres(dlm, res);
723                 spin_unlock(&res->spinlock);
724                 spin_unlock(&dlm->spinlock);
725                 /* lockres still marked IN_PROGRESS */
726                 goto wake_waiters;
727         }
728
729         /* check master list to see if another node has started mastering it */
730         spin_lock(&dlm->master_lock);
731
732         /* if we found a block, wait for lock to be mastered by another node */
733         blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
734         if (blocked) {
735                 if (mle->type == DLM_MLE_MASTER) {
736                         mlog(ML_ERROR, "master entry for nonexistent lock!\n");
737                         BUG();
738                 } else if (mle->type == DLM_MLE_MIGRATION) {
739                         /* migration is in progress! */
740                         /* the good news is that we now know the
741                          * "current" master (mle->master). */
742
743                         spin_unlock(&dlm->master_lock);
744                         assert_spin_locked(&dlm->spinlock);
745
746                         /* set the lockres owner and hash it */
747                         spin_lock(&res->spinlock);
748                         dlm_set_lockres_owner(dlm, res, mle->master);
749                         __dlm_insert_lockres(dlm, res);
750                         spin_unlock(&res->spinlock);
751                         spin_unlock(&dlm->spinlock);
752
753                         /* master is known, detach */
754                         dlm_mle_detach_hb_events(dlm, mle);
755                         dlm_put_mle(mle);
756                         mle = NULL;
757                         goto wake_waiters;
758                 }
759         } else {
760                 /* go ahead and try to master lock on this node */
761                 mle = alloc_mle;
762                 /* make sure this does not get freed below */
763                 alloc_mle = NULL;
764                 dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
765                 set_bit(dlm->node_num, mle->maybe_map);
766                 list_add(&mle->list, &dlm->master_list);
767         }
768
769         /* at this point there is either a DLM_MLE_BLOCK or a
770          * DLM_MLE_MASTER on the master list, so it's safe to add the
771          * lockres to the hashtable.  anyone who finds the lock will
772          * still have to wait on the IN_PROGRESS. */
773
774         /* finally add the lockres to its hash bucket */
775         __dlm_insert_lockres(dlm, res);
776         /* get an extra ref on the mle in case this is a BLOCK
777          * if so, the creator of the BLOCK may try to put the last
778          * ref at this time in the assert master handler, so we
779          * need an extra one to keep from a bad ptr deref. */
780         dlm_get_mle(mle);
781         spin_unlock(&dlm->master_lock);
782         spin_unlock(&dlm->spinlock);
783
784         /* must wait for lock to be mastered elsewhere */
785         if (blocked)
786                 goto wait;
787
788 redo_request:
789         ret = -EINVAL;
790         dlm_node_iter_init(mle->vote_map, &iter);
791         while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
792                 ret = dlm_do_master_request(mle, nodenum);
793                 if (ret < 0)
794                         mlog_errno(ret);
795                 if (mle->master != O2NM_MAX_NODES) {
796                         /* found a master ! */
797                         break;
798                 }
799         }
800
801 wait:
802         /* keep going until the response map includes all nodes */
803         ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
804         if (ret < 0) {
805                 mlog(0, "%s:%.*s: node map changed, redo the "
806                      "master request now, blocked=%d\n",
807                      dlm->name, res->lockname.len,
808                      res->lockname.name, blocked);
809                 if (++tries > 20) {
810                         mlog(ML_ERROR, "%s:%.*s: spinning on "
811                              "dlm_wait_for_lock_mastery, blocked=%d\n", 
812                              dlm->name, res->lockname.len, 
813                              res->lockname.name, blocked);
814                         dlm_print_one_lock_resource(res);
815                         /* dlm_print_one_mle(mle); */
816                         tries = 0;
817                 }
818                 goto redo_request;
819         }
820
821         mlog(0, "lockres mastered by %u\n", res->owner);
822         /* make sure we never continue without this */
823         BUG_ON(res->owner == O2NM_MAX_NODES);
824
825         /* master is known, detach if not already detached */
826         dlm_mle_detach_hb_events(dlm, mle);
827         dlm_put_mle(mle);
828         /* put the extra ref */
829         dlm_put_mle(mle);
830
831 wake_waiters:
832         spin_lock(&res->spinlock);
833         res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
834         spin_unlock(&res->spinlock);
835         wake_up(&res->wq);
836
837 leave:
838         /* need to free the unused mle */
839         if (alloc_mle)
840                 kmem_cache_free(dlm_mle_cache, alloc_mle);
841
842         return res;
843 }
844
845
846 #define DLM_MASTERY_TIMEOUT_MS   5000
847
848 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
849                                      struct dlm_lock_resource *res,
850                                      struct dlm_master_list_entry *mle,
851                                      int *blocked)
852 {
853         u8 m;
854         int ret, bit;
855         int map_changed, voting_done;
856         int assert, sleep;
857
858 recheck:
859         ret = 0;
860         assert = 0;
861
862         /* check if another node has already become the owner */
863         spin_lock(&res->spinlock);
864         if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
865                 spin_unlock(&res->spinlock);
866                 goto leave;
867         }
868         spin_unlock(&res->spinlock);
869
870         spin_lock(&mle->spinlock);
871         m = mle->master;
872         map_changed = (memcmp(mle->vote_map, mle->node_map,
873                               sizeof(mle->vote_map)) != 0);
874         voting_done = (memcmp(mle->vote_map, mle->response_map,
875                              sizeof(mle->vote_map)) == 0);
876
877         /* restart if we hit any errors */
878         if (map_changed) {
879                 int b;
880                 mlog(0, "%s: %.*s: node map changed, restarting\n",
881                      dlm->name, res->lockname.len, res->lockname.name);
882                 ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
883                 b = (mle->type == DLM_MLE_BLOCK);
884                 if ((*blocked && !b) || (!*blocked && b)) {
885                         mlog(0, "%s:%.*s: status change: old=%d new=%d\n", 
886                              dlm->name, res->lockname.len, res->lockname.name,
887                              *blocked, b);
888                         *blocked = b;
889                 }
890                 spin_unlock(&mle->spinlock);
891                 if (ret < 0) {
892                         mlog_errno(ret);
893                         goto leave;
894                 }
895                 mlog(0, "%s:%.*s: restart lock mastery succeeded, "
896                      "rechecking now\n", dlm->name, res->lockname.len,
897                      res->lockname.name);
898                 goto recheck;
899         }
900
901         if (m != O2NM_MAX_NODES) {
902                 /* another node has done an assert!
903                  * all done! */
904                 sleep = 0;
905         } else {
906                 sleep = 1;
907                 /* have all nodes responded? */
908                 if (voting_done && !*blocked) {
909                         bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
910                         if (dlm->node_num <= bit) {
911                                 /* my node number is lowest.
912                                  * now tell other nodes that I am
913                                  * mastering this. */
914                                 mle->master = dlm->node_num;
915                                 assert = 1;
916                                 sleep = 0;
917                         }
918                         /* if voting is done, but we have not received
919                          * an assert master yet, we must sleep */
920                 }
921         }
922
923         spin_unlock(&mle->spinlock);
924
925         /* sleep if we haven't finished voting yet */
926         if (sleep) {
927                 unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
928
929                 /*
930                 if (atomic_read(&mle->mle_refs.refcount) < 2)
931                         mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle,
932                         atomic_read(&mle->mle_refs.refcount),
933                         res->lockname.len, res->lockname.name);
934                 */
935                 atomic_set(&mle->woken, 0);
936                 (void)wait_event_timeout(mle->wq,
937                                          (atomic_read(&mle->woken) == 1),
938                                          timeo);
939                 if (res->owner == O2NM_MAX_NODES) {
940                         mlog(0, "waiting again\n");
941                         goto recheck;
942                 }
943                 mlog(0, "done waiting, master is %u\n", res->owner);
944                 ret = 0;
945                 goto leave;
946         }
947
948         ret = 0;   /* done */
949         if (assert) {
950                 m = dlm->node_num;
951                 mlog(0, "about to master %.*s here, this=%u\n",
952                      res->lockname.len, res->lockname.name, m);
953                 ret = dlm_do_assert_master(dlm, res->lockname.name,
954                                            res->lockname.len, mle->vote_map, 0);
955                 if (ret) {
956                         /* This is a failure in the network path,
957                          * not in the response to the assert_master
958                          * (any nonzero response is a BUG on this node).
959                          * Most likely a socket just got disconnected
960                          * due to node death. */
961                         mlog_errno(ret);
962                 }
963                 /* no longer need to restart lock mastery.
964                  * all living nodes have been contacted. */
965                 ret = 0;
966         }
967
968         /* set the lockres owner */
969         spin_lock(&res->spinlock);
970         dlm_change_lockres_owner(dlm, res, m);
971         spin_unlock(&res->spinlock);
972
973 leave:
974         return ret;
975 }
976
977 struct dlm_bitmap_diff_iter
978 {
979         int curnode;
980         unsigned long *orig_bm;
981         unsigned long *cur_bm;
982         unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
983 };
984
985 enum dlm_node_state_change
986 {
987         NODE_DOWN = -1,
988         NODE_NO_CHANGE = 0,
989         NODE_UP
990 };
991
992 static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
993                                       unsigned long *orig_bm,
994                                       unsigned long *cur_bm)
995 {
996         unsigned long p1, p2;
997         int i;
998
999         iter->curnode = -1;
1000         iter->orig_bm = orig_bm;
1001         iter->cur_bm = cur_bm;
1002
1003         for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
1004                 p1 = *(iter->orig_bm + i);
1005                 p2 = *(iter->cur_bm + i);
1006                 iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
1007         }
1008 }
1009
1010 static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
1011                                      enum dlm_node_state_change *state)
1012 {
1013         int bit;
1014
1015         if (iter->curnode >= O2NM_MAX_NODES)
1016                 return -ENOENT;
1017
1018         bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
1019                             iter->curnode+1);
1020         if (bit >= O2NM_MAX_NODES) {
1021                 iter->curnode = O2NM_MAX_NODES;
1022                 return -ENOENT;
1023         }
1024
1025         /* if it was there in the original then this node died */
1026         if (test_bit(bit, iter->orig_bm))
1027                 *state = NODE_DOWN;
1028         else
1029                 *state = NODE_UP;
1030
1031         iter->curnode = bit;
1032         return bit;
1033 }
1034
1035
1036 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1037                                     struct dlm_lock_resource *res,
1038                                     struct dlm_master_list_entry *mle,
1039                                     int blocked)
1040 {
1041         struct dlm_bitmap_diff_iter bdi;
1042         enum dlm_node_state_change sc;
1043         int node;
1044         int ret = 0;
1045
1046         mlog(0, "something happened such that the "
1047              "master process may need to be restarted!\n");
1048
1049         assert_spin_locked(&mle->spinlock);
1050
1051         dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
1052         node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1053         while (node >= 0) {
1054                 if (sc == NODE_UP) {
1055                         /* a node came up.  easy.  might not even need
1056                          * to talk to it if its node number is higher
1057                          * or if we are already blocked. */
1058                         mlog(0, "node up! %d\n", node);
1059                         if (blocked)
1060                                 goto next;
1061
1062                         if (node > dlm->node_num) {
1063                                 mlog(0, "node > this node. skipping.\n");
1064                                 goto next;
1065                         }
1066
1067                         /* redo the master request, but only for the new node */
1068                         mlog(0, "sending request to new node\n");
1069                         clear_bit(node, mle->response_map);
1070                         set_bit(node, mle->vote_map);
1071                 } else {
1072                         mlog(ML_ERROR, "node down! %d\n", node);
1073
1074                         /* if the node wasn't involved in mastery skip it,
1075                          * but clear it out from the maps so that it will
1076                          * not affect mastery of this lockres */
1077                         clear_bit(node, mle->response_map);
1078                         clear_bit(node, mle->vote_map);
1079                         if (!test_bit(node, mle->maybe_map))
1080                                 goto next;
1081
1082                         /* if we're already blocked on lock mastery, and the
1083                          * dead node wasn't the expected master, or there is
1084                          * another node in the maybe_map, keep waiting */
1085                         if (blocked) {
1086                                 int lowest = find_next_bit(mle->maybe_map,
1087                                                        O2NM_MAX_NODES, 0);
1088
1089                                 /* act like it was never there */
1090                                 clear_bit(node, mle->maybe_map);
1091
1092                                 if (node != lowest)
1093                                         goto next;
1094
1095                                 mlog(ML_ERROR, "expected master %u died while "
1096                                      "this node was blocked waiting on it!\n",
1097                                      node);
1098                                 lowest = find_next_bit(mle->maybe_map,
1099                                                        O2NM_MAX_NODES,
1100                                                        lowest+1);
1101                                 if (lowest < O2NM_MAX_NODES) {
1102                                         mlog(0, "still blocked. waiting "
1103                                              "on %u now\n", lowest);
1104                                         goto next;
1105                                 }
1106
1107                                 /* mle is an MLE_BLOCK, but there is now
1108                                  * nothing left to block on.  we need to return
1109                                  * all the way back out and try again with
1110                                  * an MLE_MASTER. dlm_do_local_recovery_cleanup
1111                                  * has already run, so the mle refcount is ok */
1112                                 mlog(0, "no longer blocking. we can "
1113                                      "try to master this here\n");
1114                                 mle->type = DLM_MLE_MASTER;
1115                                 memset(mle->maybe_map, 0,
1116                                        sizeof(mle->maybe_map));
1117                                 memset(mle->response_map, 0,
1118                                        sizeof(mle->maybe_map));
1119                                 memcpy(mle->vote_map, mle->node_map,
1120                                        sizeof(mle->node_map));
1121                                 mle->u.res = res;
1122                                 set_bit(dlm->node_num, mle->maybe_map);
1123
1124                                 ret = -EAGAIN;
1125                                 goto next;
1126                         }
1127
1128                         clear_bit(node, mle->maybe_map);
1129                         if (node > dlm->node_num)
1130                                 goto next;
1131
1132                         mlog(0, "dead node in map!\n");
1133                         /* yuck. go back and re-contact all nodes
1134                          * in the vote_map, removing this node. */
1135                         memset(mle->response_map, 0,
1136                                sizeof(mle->response_map));
1137                 }
1138                 ret = -EAGAIN;
1139 next:
1140                 node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1141         }
1142         return ret;
1143 }
1144
1145
1146 /*
1147  * DLM_MASTER_REQUEST_MSG
1148  *
1149  * returns: 0 on success,
1150  *          -errno on a network error
1151  *
1152  * on error, the caller should assume the target node is "dead"
1153  *
1154  */
1155
1156 static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to)
1157 {
1158         struct dlm_ctxt *dlm = mle->dlm;
1159         struct dlm_master_request request;
1160         int ret, response=0, resend;
1161
1162         memset(&request, 0, sizeof(request));
1163         request.node_idx = dlm->node_num;
1164
1165         BUG_ON(mle->type == DLM_MLE_MIGRATION);
1166
1167         if (mle->type != DLM_MLE_MASTER) {
1168                 request.namelen = mle->u.name.len;
1169                 memcpy(request.name, mle->u.name.name, request.namelen);
1170         } else {
1171                 request.namelen = mle->u.res->lockname.len;
1172                 memcpy(request.name, mle->u.res->lockname.name,
1173                         request.namelen);
1174         }
1175
1176 again:
1177         ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
1178                                  sizeof(request), to, &response);
1179         if (ret < 0)  {
1180                 if (ret == -ESRCH) {
1181                         /* should never happen */
1182                         mlog(ML_ERROR, "TCP stack not ready!\n");
1183                         BUG();
1184                 } else if (ret == -EINVAL) {
1185                         mlog(ML_ERROR, "bad args passed to o2net!\n");
1186                         BUG();
1187                 } else if (ret == -ENOMEM) {
1188                         mlog(ML_ERROR, "out of memory while trying to send "
1189                              "network message!  retrying\n");
1190                         /* this is totally crude */
1191                         msleep(50);
1192                         goto again;
1193                 } else if (!dlm_is_host_down(ret)) {
1194                         /* not a network error. bad. */
1195                         mlog_errno(ret);
1196                         mlog(ML_ERROR, "unhandled error!");
1197                         BUG();
1198                 }
1199                 /* all other errors should be network errors,
1200                  * and likely indicate node death */
1201                 mlog(ML_ERROR, "link to %d went down!\n", to);
1202                 goto out;
1203         }
1204
1205         ret = 0;
1206         resend = 0;
1207         spin_lock(&mle->spinlock);
1208         switch (response) {
1209                 case DLM_MASTER_RESP_YES:
1210                         set_bit(to, mle->response_map);
1211                         mlog(0, "node %u is the master, response=YES\n", to);
1212                         mle->master = to;
1213                         break;
1214                 case DLM_MASTER_RESP_NO:
1215                         mlog(0, "node %u not master, response=NO\n", to);
1216                         set_bit(to, mle->response_map);
1217                         break;
1218                 case DLM_MASTER_RESP_MAYBE:
1219                         mlog(0, "node %u not master, response=MAYBE\n", to);
1220                         set_bit(to, mle->response_map);
1221                         set_bit(to, mle->maybe_map);
1222                         break;
1223                 case DLM_MASTER_RESP_ERROR:
1224                         mlog(0, "node %u hit an error, resending\n", to);
1225                         resend = 1;
1226                         response = 0;
1227                         break;
1228                 default:
1229                         mlog(ML_ERROR, "bad response! %u\n", response);
1230                         BUG();
1231         }
1232         spin_unlock(&mle->spinlock);
1233         if (resend) {
1234                 /* this is also totally crude */
1235                 msleep(50);
1236                 goto again;
1237         }
1238
1239 out:
1240         return ret;
1241 }
1242
1243 /*
1244  * locks that can be taken here:
1245  * dlm->spinlock
1246  * res->spinlock
1247  * mle->spinlock
1248  * dlm->master_list
1249  *
1250  * if possible, TRIM THIS DOWN!!!
1251  */
1252 int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
1253 {
1254         u8 response = DLM_MASTER_RESP_MAYBE;
1255         struct dlm_ctxt *dlm = data;
1256         struct dlm_lock_resource *res;
1257         struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
1258         struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
1259         char *name;
1260         unsigned int namelen;
1261         int found, ret;
1262         int set_maybe;
1263
1264         if (!dlm_grab(dlm))
1265                 return DLM_MASTER_RESP_NO;
1266
1267         if (!dlm_domain_fully_joined(dlm)) {
1268                 response = DLM_MASTER_RESP_NO;
1269                 goto send_response;
1270         }
1271
1272         name = request->name;
1273         namelen = request->namelen;
1274
1275         if (namelen > DLM_LOCKID_NAME_MAX) {
1276                 response = DLM_IVBUFLEN;
1277                 goto send_response;
1278         }
1279
1280 way_up_top:
1281         spin_lock(&dlm->spinlock);
1282         res = __dlm_lookup_lockres(dlm, name, namelen);
1283         if (res) {
1284                 spin_unlock(&dlm->spinlock);
1285
1286                 /* take care of the easy cases up front */
1287                 spin_lock(&res->spinlock);
1288                 if (res->state & DLM_LOCK_RES_RECOVERING) {
1289                         spin_unlock(&res->spinlock);
1290                         mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
1291                              "being recovered\n");
1292                         response = DLM_MASTER_RESP_ERROR;
1293                         if (mle)
1294                                 kmem_cache_free(dlm_mle_cache, mle);
1295                         goto send_response;
1296                 }
1297
1298                 if (res->owner == dlm->node_num) {
1299                         u32 flags = DLM_ASSERT_MASTER_MLE_CLEANUP;
1300                         spin_unlock(&res->spinlock);
1301                         // mlog(0, "this node is the master\n");
1302                         response = DLM_MASTER_RESP_YES;
1303                         if (mle)
1304                                 kmem_cache_free(dlm_mle_cache, mle);
1305
1306                         /* this node is the owner.
1307                          * there is some extra work that needs to
1308                          * happen now.  the requesting node has
1309                          * caused all nodes up to this one to
1310                          * create mles.  this node now needs to
1311                          * go back and clean those up. */
1312                         mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
1313                              dlm->node_num, res->lockname.len, res->lockname.name);
1314                         ret = dlm_dispatch_assert_master(dlm, res, 1,
1315                                                          request->node_idx,
1316                                                          flags);
1317                         if (ret < 0) {
1318                                 mlog(ML_ERROR, "failed to dispatch assert "
1319                                      "master work\n");
1320                                 response = DLM_MASTER_RESP_ERROR;
1321                         }
1322                         goto send_response;
1323                 } else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1324                         spin_unlock(&res->spinlock);
1325                         // mlog(0, "node %u is the master\n", res->owner);
1326                         response = DLM_MASTER_RESP_NO;
1327                         if (mle)
1328                                 kmem_cache_free(dlm_mle_cache, mle);
1329                         goto send_response;
1330                 }
1331
1332                 /* ok, there is no owner.  either this node is
1333                  * being blocked, or it is actively trying to
1334                  * master this lock. */
1335                 if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1336                         mlog(ML_ERROR, "lock with no owner should be "
1337                              "in-progress!\n");
1338                         BUG();
1339                 }
1340
1341                 // mlog(0, "lockres is in progress...\n");
1342                 spin_lock(&dlm->master_lock);
1343                 found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1344                 if (!found) {
1345                         mlog(ML_ERROR, "no mle found for this lock!\n");
1346                         BUG();
1347                 }
1348                 set_maybe = 1;
1349                 spin_lock(&tmpmle->spinlock);
1350                 if (tmpmle->type == DLM_MLE_BLOCK) {
1351                         // mlog(0, "this node is waiting for "
1352                         // "lockres to be mastered\n");
1353                         response = DLM_MASTER_RESP_NO;
1354                 } else if (tmpmle->type == DLM_MLE_MIGRATION) {
1355                         mlog(0, "node %u is master, but trying to migrate to "
1356                              "node %u.\n", tmpmle->master, tmpmle->new_master);
1357                         if (tmpmle->master == dlm->node_num) {
1358                                 response = DLM_MASTER_RESP_YES;
1359                                 mlog(ML_ERROR, "no owner on lockres, but this "
1360                                      "node is trying to migrate it to %u?!\n",
1361                                      tmpmle->new_master);
1362                                 BUG();
1363                         } else {
1364                                 /* the real master can respond on its own */
1365                                 response = DLM_MASTER_RESP_NO;
1366                         }
1367                 } else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1368                         set_maybe = 0;
1369                         if (tmpmle->master == dlm->node_num)
1370                                 response = DLM_MASTER_RESP_YES;
1371                         else
1372                                 response = DLM_MASTER_RESP_NO;
1373                 } else {
1374                         // mlog(0, "this node is attempting to "
1375                         // "master lockres\n");
1376                         response = DLM_MASTER_RESP_MAYBE;
1377                 }
1378                 if (set_maybe)
1379                         set_bit(request->node_idx, tmpmle->maybe_map);
1380                 spin_unlock(&tmpmle->spinlock);
1381
1382                 spin_unlock(&dlm->master_lock);
1383                 spin_unlock(&res->spinlock);
1384
1385                 /* keep the mle attached to heartbeat events */
1386                 dlm_put_mle(tmpmle);
1387                 if (mle)
1388                         kmem_cache_free(dlm_mle_cache, mle);
1389                 goto send_response;
1390         }
1391
1392         /*
1393          * lockres doesn't exist on this node
1394          * if there is an MLE_BLOCK, return NO
1395          * if there is an MLE_MASTER, return MAYBE
1396          * otherwise, add an MLE_BLOCK, return NO
1397          */
1398         spin_lock(&dlm->master_lock);
1399         found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1400         if (!found) {
1401                 /* this lockid has never been seen on this node yet */
1402                 // mlog(0, "no mle found\n");
1403                 if (!mle) {
1404                         spin_unlock(&dlm->master_lock);
1405                         spin_unlock(&dlm->spinlock);
1406
1407                         mle = (struct dlm_master_list_entry *)
1408                                 kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL);
1409                         if (!mle) {
1410                                 // bad bad bad... this sucks.
1411                                 response = DLM_MASTER_RESP_ERROR;
1412                                 goto send_response;
1413                         }
1414                         spin_lock(&dlm->spinlock);
1415                         dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL,
1416                                          name, namelen);
1417                         spin_unlock(&dlm->spinlock);
1418                         goto way_up_top;
1419                 }
1420
1421                 // mlog(0, "this is second time thru, already allocated, "
1422                 // "add the block.\n");
1423                 set_bit(request->node_idx, mle->maybe_map);
1424                 list_add(&mle->list, &dlm->master_list);
1425                 response = DLM_MASTER_RESP_NO;
1426         } else {
1427                 // mlog(0, "mle was found\n");
1428                 set_maybe = 1;
1429                 spin_lock(&tmpmle->spinlock);
1430                 if (tmpmle->type == DLM_MLE_BLOCK)
1431                         response = DLM_MASTER_RESP_NO;
1432                 else if (tmpmle->type == DLM_MLE_MIGRATION) {
1433                         mlog(0, "migration mle was found (%u->%u)\n",
1434                              tmpmle->master, tmpmle->new_master);
1435                         if (tmpmle->master == dlm->node_num) {
1436                                 mlog(ML_ERROR, "no lockres, but migration mle "
1437                                      "says that this node is master!\n");
1438                                 BUG();
1439                         }
1440                         /* real master can respond on its own */
1441                         response = DLM_MASTER_RESP_NO;
1442                 } else {
1443                         if (tmpmle->master == dlm->node_num) {
1444                                 response = DLM_MASTER_RESP_YES;
1445                                 set_maybe = 0;
1446                         } else
1447                                 response = DLM_MASTER_RESP_MAYBE;
1448                 }
1449                 if (set_maybe)
1450                         set_bit(request->node_idx, tmpmle->maybe_map);
1451                 spin_unlock(&tmpmle->spinlock);
1452         }
1453         spin_unlock(&dlm->master_lock);
1454         spin_unlock(&dlm->spinlock);
1455
1456         if (found) {
1457                 /* keep the mle attached to heartbeat events */
1458                 dlm_put_mle(tmpmle);
1459         }
1460 send_response:
1461         dlm_put(dlm);
1462         return response;
1463 }
1464
1465 /*
1466  * DLM_ASSERT_MASTER_MSG
1467  */
1468
1469
1470 /*
1471  * NOTE: this can be used for debugging
1472  * can periodically run all locks owned by this node
1473  * and re-assert across the cluster...
1474  */
1475 static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
1476                                 unsigned int namelen, void *nodemap,
1477                                 u32 flags)
1478 {
1479         struct dlm_assert_master assert;
1480         int to, tmpret;
1481         struct dlm_node_iter iter;
1482         int ret = 0;
1483
1484         BUG_ON(namelen > O2NM_MAX_NAME_LEN);
1485
1486         /* note that if this nodemap is empty, it returns 0 */
1487         dlm_node_iter_init(nodemap, &iter);
1488         while ((to = dlm_node_iter_next(&iter)) >= 0) {
1489                 int r = 0;
1490                 mlog(0, "sending assert master to %d (%.*s)\n", to,
1491                      namelen, lockname);
1492                 memset(&assert, 0, sizeof(assert));
1493                 assert.node_idx = dlm->node_num;
1494                 assert.namelen = namelen;
1495                 memcpy(assert.name, lockname, namelen);
1496                 assert.flags = cpu_to_be32(flags);
1497
1498                 tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
1499                                             &assert, sizeof(assert), to, &r);
1500                 if (tmpret < 0) {
1501                         mlog(ML_ERROR, "assert_master returned %d!\n", tmpret);
1502                         if (!dlm_is_host_down(tmpret)) {
1503                                 mlog(ML_ERROR, "unhandled error!\n");
1504                                 BUG();
1505                         }
1506                         /* a node died.  finish out the rest of the nodes. */
1507                         mlog(ML_ERROR, "link to %d went down!\n", to);
1508                         /* any nonzero status return will do */
1509                         ret = tmpret;
1510                 } else if (r < 0) {
1511                         /* ok, something horribly messed.  kill thyself. */
1512                         mlog(ML_ERROR,"during assert master of %.*s to %u, "
1513                              "got %d.\n", namelen, lockname, to, r);
1514                         dlm_dump_lock_resources(dlm);
1515                         BUG();
1516                 }
1517         }
1518
1519         return ret;
1520 }
1521
1522 /*
1523  * locks that can be taken here:
1524  * dlm->spinlock
1525  * res->spinlock
1526  * mle->spinlock
1527  * dlm->master_list
1528  *
1529  * if possible, TRIM THIS DOWN!!!
1530  */
1531 int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
1532 {
1533         struct dlm_ctxt *dlm = data;
1534         struct dlm_master_list_entry *mle = NULL;
1535         struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
1536         struct dlm_lock_resource *res = NULL;
1537         char *name;
1538         unsigned int namelen;
1539         u32 flags;
1540
1541         if (!dlm_grab(dlm))
1542                 return 0;
1543
1544         name = assert->name;
1545         namelen = assert->namelen;
1546         flags = be32_to_cpu(assert->flags);
1547
1548         if (namelen > DLM_LOCKID_NAME_MAX) {
1549                 mlog(ML_ERROR, "Invalid name length!");
1550                 goto done;
1551         }
1552
1553         spin_lock(&dlm->spinlock);
1554
1555         if (flags)
1556                 mlog(0, "assert_master with flags: %u\n", flags);
1557
1558         /* find the MLE */
1559         spin_lock(&dlm->master_lock);
1560         if (!dlm_find_mle(dlm, &mle, name, namelen)) {
1561                 /* not an error, could be master just re-asserting */
1562                 mlog(0, "just got an assert_master from %u, but no "
1563                      "MLE for it! (%.*s)\n", assert->node_idx,
1564                      namelen, name);
1565         } else {
1566                 int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0);
1567                 if (bit >= O2NM_MAX_NODES) {
1568                         /* not necessarily an error, though less likely.
1569                          * could be master just re-asserting. */
1570                         mlog(ML_ERROR, "no bits set in the maybe_map, but %u "
1571                              "is asserting! (%.*s)\n", assert->node_idx,
1572                              namelen, name);
1573                 } else if (bit != assert->node_idx) {
1574                         if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1575                                 mlog(0, "master %u was found, %u should "
1576                                      "back off\n", assert->node_idx, bit);
1577                         } else {
1578                                 /* with the fix for bug 569, a higher node
1579                                  * number winning the mastery will respond
1580                                  * YES to mastery requests, but this node
1581                                  * had no way of knowing.  let it pass. */
1582                                 mlog(ML_ERROR, "%u is the lowest node, "
1583                                      "%u is asserting. (%.*s)  %u must "
1584                                      "have begun after %u won.\n", bit,
1585                                      assert->node_idx, namelen, name, bit,
1586                                      assert->node_idx);
1587                         }
1588                 }
1589         }
1590         spin_unlock(&dlm->master_lock);
1591
1592         /* ok everything checks out with the MLE
1593          * now check to see if there is a lockres */
1594         res = __dlm_lookup_lockres(dlm, name, namelen);
1595         if (res) {
1596                 spin_lock(&res->spinlock);
1597                 if (res->state & DLM_LOCK_RES_RECOVERING)  {
1598                         mlog(ML_ERROR, "%u asserting but %.*s is "
1599                              "RECOVERING!\n", assert->node_idx, namelen, name);
1600                         goto kill;
1601                 }
1602                 if (!mle) {
1603                         if (res->owner != assert->node_idx) {
1604                                 mlog(ML_ERROR, "assert_master from "
1605                                           "%u, but current owner is "
1606                                           "%u! (%.*s)\n",
1607                                        assert->node_idx, res->owner,
1608                                        namelen, name);
1609                                 goto kill;
1610                         }
1611                 } else if (mle->type != DLM_MLE_MIGRATION) {
1612                         if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1613                                 /* owner is just re-asserting */
1614                                 if (res->owner == assert->node_idx) {
1615                                         mlog(0, "owner %u re-asserting on "
1616                                              "lock %.*s\n", assert->node_idx,
1617                                              namelen, name);
1618                                         goto ok;
1619                                 }
1620                                 mlog(ML_ERROR, "got assert_master from "
1621                                      "node %u, but %u is the owner! "
1622                                      "(%.*s)\n", assert->node_idx,
1623                                      res->owner, namelen, name);
1624                                 goto kill;
1625                         }
1626                         if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1627                                 mlog(ML_ERROR, "got assert from %u, but lock "
1628                                      "with no owner should be "
1629                                      "in-progress! (%.*s)\n",
1630                                      assert->node_idx,
1631                                      namelen, name);
1632                                 goto kill;
1633                         }
1634                 } else /* mle->type == DLM_MLE_MIGRATION */ {
1635                         /* should only be getting an assert from new master */
1636                         if (assert->node_idx != mle->new_master) {
1637                                 mlog(ML_ERROR, "got assert from %u, but "
1638                                      "new master is %u, and old master "
1639                                      "was %u (%.*s)\n",
1640                                      assert->node_idx, mle->new_master,
1641                                      mle->master, namelen, name);
1642                                 goto kill;
1643                         }
1644
1645                 }
1646 ok:
1647                 spin_unlock(&res->spinlock);
1648         }
1649         spin_unlock(&dlm->spinlock);
1650
1651         // mlog(0, "woo!  got an assert_master from node %u!\n",
1652         //           assert->node_idx);
1653         if (mle) {
1654                 int extra_ref;
1655                 
1656                 spin_lock(&mle->spinlock);
1657                 extra_ref = !!(mle->type == DLM_MLE_BLOCK
1658                                || mle->type == DLM_MLE_MIGRATION);
1659                 mle->master = assert->node_idx;
1660                 atomic_set(&mle->woken, 1);
1661                 wake_up(&mle->wq);
1662                 spin_unlock(&mle->spinlock);
1663
1664                 if (mle->type == DLM_MLE_MIGRATION && res) {
1665                         mlog(0, "finishing off migration of lockres %.*s, "
1666                              "from %u to %u\n",
1667                                res->lockname.len, res->lockname.name,
1668                                dlm->node_num, mle->new_master);
1669                         spin_lock(&res->spinlock);
1670                         res->state &= ~DLM_LOCK_RES_MIGRATING;
1671                         dlm_change_lockres_owner(dlm, res, mle->new_master);
1672                         BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
1673                         spin_unlock(&res->spinlock);
1674                 }
1675                 /* master is known, detach if not already detached */
1676                 dlm_mle_detach_hb_events(dlm, mle);
1677                 dlm_put_mle(mle);
1678                 
1679                 if (extra_ref) {
1680                         /* the assert master message now balances the extra
1681                          * ref given by the master / migration request message.
1682                          * if this is the last put, it will be removed
1683                          * from the list. */
1684                         dlm_put_mle(mle);
1685                 }
1686         }
1687
1688 done:
1689         if (res)
1690                 dlm_lockres_put(res);
1691         dlm_put(dlm);
1692         return 0;
1693
1694 kill:
1695         /* kill the caller! */
1696         spin_unlock(&res->spinlock);
1697         spin_unlock(&dlm->spinlock);
1698         dlm_lockres_put(res);
1699         mlog(ML_ERROR, "Bad message received from another node.  Dumping state "
1700              "and killing the other node now!  This node is OK and can continue.\n");
1701         dlm_dump_lock_resources(dlm);
1702         dlm_put(dlm);
1703         return -EINVAL;
1704 }
1705
1706 int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
1707                                struct dlm_lock_resource *res,
1708                                int ignore_higher, u8 request_from, u32 flags)
1709 {
1710         struct dlm_work_item *item;
1711         item = kcalloc(1, sizeof(*item), GFP_KERNEL);
1712         if (!item)
1713                 return -ENOMEM;
1714
1715
1716         /* queue up work for dlm_assert_master_worker */
1717         dlm_grab(dlm);  /* get an extra ref for the work item */
1718         dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
1719         item->u.am.lockres = res; /* already have a ref */
1720         /* can optionally ignore node numbers higher than this node */
1721         item->u.am.ignore_higher = ignore_higher;
1722         item->u.am.request_from = request_from;
1723         item->u.am.flags = flags;
1724
1725         spin_lock(&dlm->work_lock);
1726         list_add_tail(&item->list, &dlm->work_list);
1727         spin_unlock(&dlm->work_lock);
1728
1729         schedule_work(&dlm->dispatched_work);
1730         return 0;
1731 }
1732
1733 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
1734 {
1735         struct dlm_ctxt *dlm = data;
1736         int ret = 0;
1737         struct dlm_lock_resource *res;
1738         unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];
1739         int ignore_higher;
1740         int bit;
1741         u8 request_from;
1742         u32 flags;
1743
1744         dlm = item->dlm;
1745         res = item->u.am.lockres;
1746         ignore_higher = item->u.am.ignore_higher;
1747         request_from = item->u.am.request_from;
1748         flags = item->u.am.flags;
1749
1750         spin_lock(&dlm->spinlock);
1751         memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
1752         spin_unlock(&dlm->spinlock);
1753
1754         clear_bit(dlm->node_num, nodemap);
1755         if (ignore_higher) {
1756                 /* if is this just to clear up mles for nodes below
1757                  * this node, do not send the message to the original
1758                  * caller or any node number higher than this */
1759                 clear_bit(request_from, nodemap);
1760                 bit = dlm->node_num;
1761                 while (1) {
1762                         bit = find_next_bit(nodemap, O2NM_MAX_NODES,
1763                                             bit+1);
1764                         if (bit >= O2NM_MAX_NODES)
1765                                 break;
1766                         clear_bit(bit, nodemap);
1767                 }
1768         }
1769
1770         /* this call now finishes out the nodemap
1771          * even if one or more nodes die */
1772         mlog(0, "worker about to master %.*s here, this=%u\n",
1773                      res->lockname.len, res->lockname.name, dlm->node_num);
1774         ret = dlm_do_assert_master(dlm, res->lockname.name,
1775                                    res->lockname.len,
1776                                    nodemap, flags);
1777         if (ret < 0) {
1778                 /* no need to restart, we are done */
1779                 mlog_errno(ret);
1780         }
1781
1782         dlm_lockres_put(res);
1783
1784         mlog(0, "finished with dlm_assert_master_worker\n");
1785 }
1786
1787
1788 /*
1789  * DLM_MIGRATE_LOCKRES
1790  */
1791
1792
1793 int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
1794                         u8 target)
1795 {
1796         struct dlm_master_list_entry *mle = NULL;
1797         struct dlm_master_list_entry *oldmle = NULL;
1798         struct dlm_migratable_lockres *mres = NULL;
1799         int ret = -EINVAL;
1800         const char *name;
1801         unsigned int namelen;
1802         int mle_added = 0;
1803         struct list_head *queue, *iter;
1804         int i;
1805         struct dlm_lock *lock;
1806         int empty = 1;
1807
1808         if (!dlm_grab(dlm))
1809                 return -EINVAL;
1810
1811         name = res->lockname.name;
1812         namelen = res->lockname.len;
1813
1814         mlog(0, "migrating %.*s to %u\n", namelen, name, target);
1815
1816         /*
1817          * ensure this lockres is a proper candidate for migration
1818          */
1819         spin_lock(&res->spinlock);
1820         if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
1821                 mlog(0, "cannot migrate lockres with unknown owner!\n");
1822                 spin_unlock(&res->spinlock);
1823                 goto leave;
1824         }
1825         if (res->owner != dlm->node_num) {
1826                 mlog(0, "cannot migrate lockres this node doesn't own!\n");
1827                 spin_unlock(&res->spinlock);
1828                 goto leave;
1829         }
1830         mlog(0, "checking queues...\n");
1831         queue = &res->granted;
1832         for (i=0; i<3; i++) {
1833                 list_for_each(iter, queue) {
1834                         lock = list_entry (iter, struct dlm_lock, list);
1835                         empty = 0;
1836                         if (lock->ml.node == dlm->node_num) {
1837                                 mlog(0, "found a lock owned by this node "
1838                                      "still on the %s queue!  will not "
1839                                      "migrate this lockres\n",
1840                                      i==0 ? "granted" :
1841                                      (i==1 ? "converting" : "blocked"));
1842                                 spin_unlock(&res->spinlock);
1843                                 ret = -ENOTEMPTY;
1844                                 goto leave;
1845                         }
1846                 }
1847                 queue++;
1848         }
1849         mlog(0, "all locks on this lockres are nonlocal.  continuing\n");
1850         spin_unlock(&res->spinlock);
1851
1852         /* no work to do */
1853         if (empty) {
1854                 mlog(0, "no locks were found on this lockres! done!\n");
1855                 ret = 0;
1856                 goto leave;
1857         }
1858
1859         /*
1860          * preallocate up front
1861          * if this fails, abort
1862          */
1863
1864         ret = -ENOMEM;
1865         mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_KERNEL);
1866         if (!mres) {
1867                 mlog_errno(ret);
1868                 goto leave;
1869         }
1870
1871         mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
1872                                                                 GFP_KERNEL);
1873         if (!mle) {
1874                 mlog_errno(ret);
1875                 goto leave;
1876         }
1877         ret = 0;
1878
1879         /*
1880          * find a node to migrate the lockres to
1881          */
1882
1883         mlog(0, "picking a migration node\n");
1884         spin_lock(&dlm->spinlock);
1885         /* pick a new node */
1886         if (!test_bit(target, dlm->domain_map) ||
1887             target >= O2NM_MAX_NODES) {
1888                 target = dlm_pick_migration_target(dlm, res);
1889         }
1890         mlog(0, "node %u chosen for migration\n", target);
1891
1892         if (target >= O2NM_MAX_NODES ||
1893             !test_bit(target, dlm->domain_map)) {
1894                 /* target chosen is not alive */
1895                 ret = -EINVAL;
1896         }
1897
1898         if (ret) {
1899                 spin_unlock(&dlm->spinlock);
1900                 goto fail;
1901         }
1902
1903         mlog(0, "continuing with target = %u\n", target);
1904
1905         /*
1906          * clear any existing master requests and
1907          * add the migration mle to the list
1908          */
1909         spin_lock(&dlm->master_lock);
1910         ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
1911                                     namelen, target, dlm->node_num);
1912         spin_unlock(&dlm->master_lock);
1913         spin_unlock(&dlm->spinlock);
1914
1915         if (ret == -EEXIST) {
1916                 mlog(0, "another process is already migrating it\n");
1917                 goto fail;
1918         }
1919         mle_added = 1;
1920
1921         /*
1922          * set the MIGRATING flag and flush asts
1923          * if we fail after this we need to re-dirty the lockres
1924          */
1925         if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
1926                 mlog(ML_ERROR, "tried to migrate %.*s to %u, but "
1927                      "the target went down.\n", res->lockname.len,
1928                      res->lockname.name, target);
1929                 spin_lock(&res->spinlock);
1930                 res->state &= ~DLM_LOCK_RES_MIGRATING;
1931                 spin_unlock(&res->spinlock);
1932                 ret = -EINVAL;
1933         }
1934
1935 fail:
1936         if (oldmle) {
1937                 /* master is known, detach if not already detached */
1938                 dlm_mle_detach_hb_events(dlm, oldmle);
1939                 dlm_put_mle(oldmle);
1940         }
1941
1942         if (ret < 0) {
1943                 if (mle_added) {
1944                         dlm_mle_detach_hb_events(dlm, mle);
1945                         dlm_put_mle(mle);
1946                 } else if (mle) {
1947                         kmem_cache_free(dlm_mle_cache, mle);
1948                 }
1949                 goto leave;
1950         }
1951
1952         /*
1953          * at this point, we have a migration target, an mle
1954          * in the master list, and the MIGRATING flag set on
1955          * the lockres
1956          */
1957
1958
1959         /* get an extra reference on the mle.
1960          * otherwise the assert_master from the new
1961          * master will destroy this.
1962          * also, make sure that all callers of dlm_get_mle
1963          * take both dlm->spinlock and dlm->master_lock */
1964         spin_lock(&dlm->spinlock);
1965         spin_lock(&dlm->master_lock);
1966         dlm_get_mle(mle);
1967         spin_unlock(&dlm->master_lock);
1968         spin_unlock(&dlm->spinlock);
1969
1970         /* notify new node and send all lock state */
1971         /* call send_one_lockres with migration flag.
1972          * this serves as notice to the target node that a
1973          * migration is starting. */
1974         ret = dlm_send_one_lockres(dlm, res, mres, target,
1975                                    DLM_MRES_MIGRATION);
1976
1977         if (ret < 0) {
1978                 mlog(0, "migration to node %u failed with %d\n",
1979                      target, ret);
1980                 /* migration failed, detach and clean up mle */
1981                 dlm_mle_detach_hb_events(dlm, mle);
1982                 dlm_put_mle(mle);
1983                 dlm_put_mle(mle);
1984                 goto leave;
1985         }
1986
1987         /* at this point, the target sends a message to all nodes,
1988          * (using dlm_do_migrate_request).  this node is skipped since
1989          * we had to put an mle in the list to begin the process.  this
1990          * node now waits for target to do an assert master.  this node
1991          * will be the last one notified, ensuring that the migration
1992          * is complete everywhere.  if the target dies while this is
1993          * going on, some nodes could potentially see the target as the
1994          * master, so it is important that my recovery finds the migration
1995          * mle and sets the master to UNKNONWN. */
1996
1997
1998         /* wait for new node to assert master */
1999         while (1) {
2000                 ret = wait_event_interruptible_timeout(mle->wq,
2001                                         (atomic_read(&mle->woken) == 1),
2002                                         msecs_to_jiffies(5000));
2003
2004                 if (ret >= 0) {
2005                         if (atomic_read(&mle->woken) == 1 ||
2006                             res->owner == target)
2007                                 break;
2008
2009                         mlog(0, "timed out during migration\n");
2010                 }
2011                 if (ret == -ERESTARTSYS) {
2012                         /* migration failed, detach and clean up mle */
2013                         dlm_mle_detach_hb_events(dlm, mle);
2014                         dlm_put_mle(mle);
2015                         dlm_put_mle(mle);
2016                         goto leave;
2017                 }
2018                 /* TODO: if node died: stop, clean up, return error */
2019         }
2020
2021         /* all done, set the owner, clear the flag */
2022         spin_lock(&res->spinlock);
2023         dlm_set_lockres_owner(dlm, res, target);
2024         res->state &= ~DLM_LOCK_RES_MIGRATING;
2025         dlm_remove_nonlocal_locks(dlm, res);
2026         spin_unlock(&res->spinlock);
2027         wake_up(&res->wq);
2028
2029         /* master is known, detach if not already detached */
2030         dlm_mle_detach_hb_events(dlm, mle);
2031         dlm_put_mle(mle);
2032         ret = 0;
2033
2034         dlm_lockres_calc_usage(dlm, res);
2035
2036 leave:
2037         /* re-dirty the lockres if we failed */
2038         if (ret < 0)
2039                 dlm_kick_thread(dlm, res);
2040
2041         /* TODO: cleanup */
2042         if (mres)
2043                 free_page((unsigned long)mres);
2044
2045         dlm_put(dlm);
2046
2047         mlog(0, "returning %d\n", ret);
2048         return ret;
2049 }
2050 EXPORT_SYMBOL_GPL(dlm_migrate_lockres);
2051
2052 int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
2053 {
2054         int ret;
2055         spin_lock(&dlm->ast_lock);
2056         spin_lock(&lock->spinlock);
2057         ret = (list_empty(&lock->bast_list) && !lock->bast_pending);
2058         spin_unlock(&lock->spinlock);
2059         spin_unlock(&dlm->ast_lock);
2060         return ret;
2061 }
2062
2063 static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
2064                                      struct dlm_lock_resource *res,
2065                                      u8 mig_target)
2066 {
2067         int can_proceed;
2068         spin_lock(&res->spinlock);
2069         can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
2070         spin_unlock(&res->spinlock);
2071
2072         /* target has died, so make the caller break out of the 
2073          * wait_event, but caller must recheck the domain_map */
2074         spin_lock(&dlm->spinlock);
2075         if (!test_bit(mig_target, dlm->domain_map))
2076                 can_proceed = 1;
2077         spin_unlock(&dlm->spinlock);
2078         return can_proceed;
2079 }
2080
2081 int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2082 {
2083         int ret;
2084         spin_lock(&res->spinlock);
2085         ret = !!(res->state & DLM_LOCK_RES_DIRTY);
2086         spin_unlock(&res->spinlock);
2087         return ret;
2088 }
2089
2090
2091 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
2092                                        struct dlm_lock_resource *res,
2093                                        u8 target)
2094 {
2095         int ret = 0;
2096
2097         mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",
2098                res->lockname.len, res->lockname.name, dlm->node_num,
2099                target);
2100         /* need to set MIGRATING flag on lockres.  this is done by
2101          * ensuring that all asts have been flushed for this lockres. */
2102         spin_lock(&res->spinlock);
2103         BUG_ON(res->migration_pending);
2104         res->migration_pending = 1;
2105         /* strategy is to reserve an extra ast then release
2106          * it below, letting the release do all of the work */
2107         __dlm_lockres_reserve_ast(res);
2108         spin_unlock(&res->spinlock);
2109
2110         /* now flush all the pending asts.. hang out for a bit */
2111         dlm_kick_thread(dlm, res);
2112         wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
2113         dlm_lockres_release_ast(dlm, res);
2114
2115         mlog(0, "about to wait on migration_wq, dirty=%s\n",
2116                res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
2117         /* if the extra ref we just put was the final one, this
2118          * will pass thru immediately.  otherwise, we need to wait
2119          * for the last ast to finish. */
2120 again:
2121         ret = wait_event_interruptible_timeout(dlm->migration_wq,
2122                    dlm_migration_can_proceed(dlm, res, target),
2123                    msecs_to_jiffies(1000));
2124         if (ret < 0) {
2125                 mlog(0, "woken again: migrating? %s, dead? %s\n",
2126                        res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2127                        test_bit(target, dlm->domain_map) ? "no":"yes");
2128         } else {
2129                 mlog(0, "all is well: migrating? %s, dead? %s\n",
2130                        res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2131                        test_bit(target, dlm->domain_map) ? "no":"yes");
2132         }
2133         if (!dlm_migration_can_proceed(dlm, res, target)) {
2134                 mlog(0, "trying again...\n");
2135                 goto again;
2136         }
2137
2138         /* did the target go down or die? */
2139         spin_lock(&dlm->spinlock);
2140         if (!test_bit(target, dlm->domain_map)) {
2141                 mlog(ML_ERROR, "aha. migration target %u just went down\n",
2142                      target);
2143                 ret = -EHOSTDOWN;
2144         }
2145         spin_unlock(&dlm->spinlock);
2146
2147         /*
2148          * at this point:
2149          *
2150          *   o the DLM_LOCK_RES_MIGRATING flag is set
2151          *   o there are no pending asts on this lockres
2152          *   o all processes trying to reserve an ast on this
2153          *     lockres must wait for the MIGRATING flag to clear
2154          */
2155         return ret;
2156 }
2157
2158 /* last step in the migration process.
2159  * original master calls this to free all of the dlm_lock
2160  * structures that used to be for other nodes. */
2161 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2162                                       struct dlm_lock_resource *res)
2163 {
2164         struct list_head *iter, *iter2;
2165         struct list_head *queue = &res->granted;
2166         int i;
2167         struct dlm_lock *lock;
2168
2169         assert_spin_locked(&res->spinlock);
2170
2171         BUG_ON(res->owner == dlm->node_num);
2172
2173         for (i=0; i<3; i++) {
2174                 list_for_each_safe(iter, iter2, queue) {
2175                         lock = list_entry (iter, struct dlm_lock, list);
2176                         if (lock->ml.node != dlm->node_num) {
2177                                 mlog(0, "putting lock for node %u\n",
2178                                      lock->ml.node);
2179                                 /* be extra careful */
2180                                 BUG_ON(!list_empty(&lock->ast_list));
2181                                 BUG_ON(!list_empty(&lock->bast_list));
2182                                 BUG_ON(lock->ast_pending);
2183                                 BUG_ON(lock->bast_pending);
2184                                 list_del_init(&lock->list);
2185                                 dlm_lock_put(lock);
2186                         }
2187                 }
2188                 queue++;
2189         }
2190 }
2191
2192 /* for now this is not too intelligent.  we will
2193  * need stats to make this do the right thing.
2194  * this just finds the first lock on one of the
2195  * queues and uses that node as the target. */
2196 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
2197                                     struct dlm_lock_resource *res)
2198 {
2199         int i;
2200         struct list_head *queue = &res->granted;
2201         struct list_head *iter;
2202         struct dlm_lock *lock;
2203         int nodenum;
2204
2205         assert_spin_locked(&dlm->spinlock);
2206
2207         spin_lock(&res->spinlock);
2208         for (i=0; i<3; i++) {
2209                 list_for_each(iter, queue) {
2210                         /* up to the caller to make sure this node
2211                          * is alive */
2212                         lock = list_entry (iter, struct dlm_lock, list);
2213                         if (lock->ml.node != dlm->node_num) {
2214                                 spin_unlock(&res->spinlock);
2215                                 return lock->ml.node;
2216                         }
2217                 }
2218                 queue++;
2219         }
2220         spin_unlock(&res->spinlock);
2221         mlog(0, "have not found a suitable target yet! checking domain map\n");
2222
2223         /* ok now we're getting desperate.  pick anyone alive. */
2224         nodenum = -1;
2225         while (1) {
2226                 nodenum = find_next_bit(dlm->domain_map,
2227                                         O2NM_MAX_NODES, nodenum+1);
2228                 mlog(0, "found %d in domain map\n", nodenum);
2229                 if (nodenum >= O2NM_MAX_NODES)
2230                         break;
2231                 if (nodenum != dlm->node_num) {
2232                         mlog(0, "picking %d\n", nodenum);
2233                         return nodenum;
2234                 }
2235         }
2236
2237         mlog(0, "giving up.  no master to migrate to\n");
2238         return DLM_LOCK_RES_OWNER_UNKNOWN;
2239 }
2240
2241
2242
2243 /* this is called by the new master once all lockres
2244  * data has been received */
2245 static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
2246                                   struct dlm_lock_resource *res,
2247                                   u8 master, u8 new_master,
2248                                   struct dlm_node_iter *iter)
2249 {
2250         struct dlm_migrate_request migrate;
2251         int ret, status = 0;
2252         int nodenum;
2253
2254         memset(&migrate, 0, sizeof(migrate));
2255         migrate.namelen = res->lockname.len;
2256         memcpy(migrate.name, res->lockname.name, migrate.namelen);
2257         migrate.new_master = new_master;
2258         migrate.master = master;
2259
2260         ret = 0;
2261
2262         /* send message to all nodes, except the master and myself */
2263         while ((nodenum = dlm_node_iter_next(iter)) >= 0) {
2264                 if (nodenum == master ||
2265                     nodenum == new_master)
2266                         continue;
2267
2268                 ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
2269                                          &migrate, sizeof(migrate), nodenum,
2270                                          &status);
2271                 if (ret < 0)
2272                         mlog_errno(ret);
2273                 else if (status < 0) {
2274                         mlog(0, "migrate request (node %u) returned %d!\n",
2275                              nodenum, status);
2276                         ret = status;
2277                 }
2278         }
2279
2280         if (ret < 0)
2281                 mlog_errno(ret);
2282
2283         mlog(0, "returning ret=%d\n", ret);
2284         return ret;
2285 }
2286
2287
2288 /* if there is an existing mle for this lockres, we now know who the master is.
2289  * (the one who sent us *this* message) we can clear it up right away.
2290  * since the process that put the mle on the list still has a reference to it,
2291  * we can unhash it now, set the master and wake the process.  as a result,
2292  * we will have no mle in the list to start with.  now we can add an mle for
2293  * the migration and this should be the only one found for those scanning the
2294  * list.  */
2295 int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data)
2296 {
2297         struct dlm_ctxt *dlm = data;
2298         struct dlm_lock_resource *res = NULL;
2299         struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
2300         struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
2301         const char *name;
2302         unsigned int namelen;
2303         int ret = 0;
2304
2305         if (!dlm_grab(dlm))
2306                 return -EINVAL;
2307
2308         name = migrate->name;
2309         namelen = migrate->namelen;
2310
2311         /* preallocate.. if this fails, abort */
2312         mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
2313                                                          GFP_KERNEL);
2314
2315         if (!mle) {
2316                 ret = -ENOMEM;
2317                 goto leave;
2318         }
2319
2320         /* check for pre-existing lock */
2321         spin_lock(&dlm->spinlock);
2322         res = __dlm_lookup_lockres(dlm, name, namelen);
2323         spin_lock(&dlm->master_lock);
2324
2325         if (res) {
2326                 spin_lock(&res->spinlock);
2327                 if (res->state & DLM_LOCK_RES_RECOVERING) {
2328                         /* if all is working ok, this can only mean that we got
2329                         * a migrate request from a node that we now see as
2330                         * dead.  what can we do here?  drop it to the floor? */
2331                         spin_unlock(&res->spinlock);
2332                         mlog(ML_ERROR, "Got a migrate request, but the "
2333                              "lockres is marked as recovering!");
2334                         kmem_cache_free(dlm_mle_cache, mle);
2335                         ret = -EINVAL; /* need a better solution */
2336                         goto unlock;
2337                 }
2338                 res->state |= DLM_LOCK_RES_MIGRATING;
2339                 spin_unlock(&res->spinlock);
2340         }
2341
2342         /* ignore status.  only nonzero status would BUG. */
2343         ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
2344                                     name, namelen,
2345                                     migrate->new_master,
2346                                     migrate->master);
2347
2348 unlock:
2349         spin_unlock(&dlm->master_lock);
2350         spin_unlock(&dlm->spinlock);
2351
2352         if (oldmle) {
2353                 /* master is known, detach if not already detached */
2354                 dlm_mle_detach_hb_events(dlm, oldmle);
2355                 dlm_put_mle(oldmle);
2356         }
2357
2358         if (res)
2359                 dlm_lockres_put(res);
2360 leave:
2361         dlm_put(dlm);
2362         return ret;
2363 }
2364
2365 /* must be holding dlm->spinlock and dlm->master_lock
2366  * when adding a migration mle, we can clear any other mles
2367  * in the master list because we know with certainty that
2368  * the master is "master".  so we remove any old mle from
2369  * the list after setting it's master field, and then add
2370  * the new migration mle.  this way we can hold with the rule
2371  * of having only one mle for a given lock name at all times. */
2372 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
2373                                  struct dlm_lock_resource *res,
2374                                  struct dlm_master_list_entry *mle,
2375                                  struct dlm_master_list_entry **oldmle,
2376                                  const char *name, unsigned int namelen,
2377                                  u8 new_master, u8 master)
2378 {
2379         int found;
2380         int ret = 0;
2381
2382         *oldmle = NULL;
2383
2384         mlog_entry_void();
2385
2386         assert_spin_locked(&dlm->spinlock);
2387         assert_spin_locked(&dlm->master_lock);
2388
2389         /* caller is responsible for any ref taken here on oldmle */
2390         found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
2391         if (found) {
2392                 struct dlm_master_list_entry *tmp = *oldmle;
2393                 spin_lock(&tmp->spinlock);
2394                 if (tmp->type == DLM_MLE_MIGRATION) {
2395                         if (master == dlm->node_num) {
2396                                 /* ah another process raced me to it */
2397                                 mlog(0, "tried to migrate %.*s, but some "
2398                                      "process beat me to it\n",
2399                                      namelen, name);
2400                                 ret = -EEXIST;
2401                         } else {
2402                                 /* bad.  2 NODES are trying to migrate! */
2403                                 mlog(ML_ERROR, "migration error  mle: "
2404                                      "master=%u new_master=%u // request: "
2405                                      "master=%u new_master=%u // "
2406                                      "lockres=%.*s\n",
2407                                      tmp->master, tmp->new_master,
2408                                      master, new_master,
2409                                      namelen, name);
2410                                 BUG();
2411                         }
2412                 } else {
2413                         /* this is essentially what assert_master does */
2414                         tmp->master = master;
2415                         atomic_set(&tmp->woken, 1);
2416                         wake_up(&tmp->wq);
2417                         /* remove it from the list so that only one
2418                          * mle will be found */
2419                         list_del_init(&tmp->list);
2420                 }
2421                 spin_unlock(&tmp->spinlock);
2422         }
2423
2424         /* now add a migration mle to the tail of the list */
2425         dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
2426         mle->new_master = new_master;
2427         mle->master = master;
2428         /* do this for consistency with other mle types */
2429         set_bit(new_master, mle->maybe_map);
2430         list_add(&mle->list, &dlm->master_list);
2431
2432         return ret;
2433 }
2434
2435
2436 void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
2437 {
2438         struct list_head *iter, *iter2;
2439         struct dlm_master_list_entry *mle;
2440         struct dlm_lock_resource *res;
2441
2442         mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node);
2443 top:
2444         assert_spin_locked(&dlm->spinlock);
2445
2446         /* clean the master list */
2447         spin_lock(&dlm->master_lock);
2448         list_for_each_safe(iter, iter2, &dlm->master_list) {
2449                 mle = list_entry(iter, struct dlm_master_list_entry, list);
2450
2451                 BUG_ON(mle->type != DLM_MLE_BLOCK &&
2452                        mle->type != DLM_MLE_MASTER &&
2453                        mle->type != DLM_MLE_MIGRATION);
2454
2455                 /* MASTER mles are initiated locally.  the waiting
2456                  * process will notice the node map change
2457                  * shortly.  let that happen as normal. */
2458                 if (mle->type == DLM_MLE_MASTER)
2459                         continue;
2460
2461
2462                 /* BLOCK mles are initiated by other nodes.
2463                  * need to clean up if the dead node would have
2464                  * been the master. */
2465                 if (mle->type == DLM_MLE_BLOCK) {
2466                         int bit;
2467
2468                         spin_lock(&mle->spinlock);
2469                         bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
2470                         if (bit != dead_node) {
2471                                 mlog(0, "mle found, but dead node %u would "
2472                                      "not have been master\n", dead_node);
2473                                 spin_unlock(&mle->spinlock);
2474                         } else {
2475                                 /* must drop the refcount by one since the
2476                                  * assert_master will never arrive.  this
2477                                  * may result in the mle being unlinked and
2478                                  * freed, but there may still be a process
2479                                  * waiting in the dlmlock path which is fine. */
2480                                 mlog(ML_ERROR, "node %u was expected master\n",
2481                                      dead_node);
2482                                 atomic_set(&mle->woken, 1);
2483                                 spin_unlock(&mle->spinlock);
2484                                 wake_up(&mle->wq);
2485                                 /* final put will take care of list removal */
2486                                 __dlm_put_mle(mle);
2487                         }
2488                         continue;
2489                 }
2490
2491                 /* everything else is a MIGRATION mle */
2492
2493                 /* the rule for MIGRATION mles is that the master
2494                  * becomes UNKNOWN if *either* the original or
2495                  * the new master dies.  all UNKNOWN lockreses
2496                  * are sent to whichever node becomes the recovery
2497                  * master.  the new master is responsible for
2498                  * determining if there is still a master for
2499                  * this lockres, or if he needs to take over
2500                  * mastery.  either way, this node should expect
2501                  * another message to resolve this. */
2502                 if (mle->master != dead_node &&
2503                     mle->new_master != dead_node)
2504                         continue;
2505
2506                 /* if we have reached this point, this mle needs to
2507                  * be removed from the list and freed. */
2508
2509                 /* remove from the list early.  NOTE: unlinking
2510                  * list_head while in list_for_each_safe */
2511                 spin_lock(&mle->spinlock);
2512                 list_del_init(&mle->list);
2513                 atomic_set(&mle->woken, 1);
2514                 spin_unlock(&mle->spinlock);
2515                 wake_up(&mle->wq);
2516
2517                 mlog(0, "node %u died during migration from "
2518                      "%u to %u!\n", dead_node,
2519                      mle->master, mle->new_master);
2520                 /* if there is a lockres associated with this
2521                  * mle, find it and set its owner to UNKNOWN */
2522                 res = __dlm_lookup_lockres(dlm, mle->u.name.name,
2523                                         mle->u.name.len);
2524                 if (res) {
2525                         /* unfortunately if we hit this rare case, our
2526                          * lock ordering is messed.  we need to drop
2527                          * the master lock so that we can take the
2528                          * lockres lock, meaning that we will have to
2529                          * restart from the head of list. */
2530                         spin_unlock(&dlm->master_lock);
2531
2532                         /* move lockres onto recovery list */
2533                         spin_lock(&res->spinlock);
2534                         dlm_set_lockres_owner(dlm, res,
2535                                         DLM_LOCK_RES_OWNER_UNKNOWN);
2536                         dlm_move_lockres_to_recovery_list(dlm, res);
2537                         spin_unlock(&res->spinlock);
2538                         dlm_lockres_put(res);
2539
2540                         /* dump the mle */
2541                         spin_lock(&dlm->master_lock);
2542                         __dlm_put_mle(mle);
2543                         spin_unlock(&dlm->master_lock);
2544
2545                         /* restart */
2546                         goto top;
2547                 }
2548
2549                 /* this may be the last reference */
2550                 __dlm_put_mle(mle);
2551         }
2552         spin_unlock(&dlm->master_lock);
2553 }
2554
2555
2556 int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
2557                          u8 old_master)
2558 {
2559         struct dlm_node_iter iter;
2560         int ret = 0;
2561
2562         spin_lock(&dlm->spinlock);
2563         dlm_node_iter_init(dlm->domain_map, &iter);
2564         clear_bit(old_master, iter.node_map);
2565         clear_bit(dlm->node_num, iter.node_map);
2566         spin_unlock(&dlm->spinlock);
2567
2568         mlog(0, "now time to do a migrate request to other nodes\n");
2569         ret = dlm_do_migrate_request(dlm, res, old_master,
2570                                      dlm->node_num, &iter);
2571         if (ret < 0) {
2572                 mlog_errno(ret);
2573                 goto leave;
2574         }
2575
2576         mlog(0, "doing assert master of %.*s to all except the original node\n",
2577              res->lockname.len, res->lockname.name);
2578         /* this call now finishes out the nodemap
2579          * even if one or more nodes die */
2580         ret = dlm_do_assert_master(dlm, res->lockname.name,
2581                                    res->lockname.len, iter.node_map,
2582                                    DLM_ASSERT_MASTER_FINISH_MIGRATION);
2583         if (ret < 0) {
2584                 /* no longer need to retry.  all living nodes contacted. */
2585                 mlog_errno(ret);
2586                 ret = 0;
2587         }
2588
2589         memset(iter.node_map, 0, sizeof(iter.node_map));
2590         set_bit(old_master, iter.node_map);
2591         mlog(0, "doing assert master of %.*s back to %u\n",
2592              res->lockname.len, res->lockname.name, old_master);
2593         ret = dlm_do_assert_master(dlm, res->lockname.name,
2594                                    res->lockname.len, iter.node_map,
2595                                    DLM_ASSERT_MASTER_FINISH_MIGRATION);
2596         if (ret < 0) {
2597                 mlog(0, "assert master to original master failed "
2598                      "with %d.\n", ret);
2599                 /* the only nonzero status here would be because of
2600                  * a dead original node.  we're done. */
2601                 ret = 0;
2602         }
2603
2604         /* all done, set the owner, clear the flag */
2605         spin_lock(&res->spinlock);
2606         dlm_set_lockres_owner(dlm, res, dlm->node_num);
2607         res->state &= ~DLM_LOCK_RES_MIGRATING;
2608         spin_unlock(&res->spinlock);
2609         /* re-dirty it on the new master */
2610         dlm_kick_thread(dlm, res);
2611         wake_up(&res->wq);
2612 leave:
2613         return ret;
2614 }
2615
2616 /*
2617  * LOCKRES AST REFCOUNT
2618  * this is integral to migration
2619  */
2620
2621 /* for future intent to call an ast, reserve one ahead of time.
2622  * this should be called only after waiting on the lockres
2623  * with dlm_wait_on_lockres, and while still holding the
2624  * spinlock after the call. */
2625 void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res)
2626 {
2627         assert_spin_locked(&res->spinlock);
2628         if (res->state & DLM_LOCK_RES_MIGRATING) {
2629                 __dlm_print_one_lock_resource(res);
2630         }
2631         BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
2632
2633         atomic_inc(&res->asts_reserved);
2634 }
2635
2636 /*
2637  * used to drop the reserved ast, either because it went unused,
2638  * or because the ast/bast was actually called.
2639  *
2640  * also, if there is a pending migration on this lockres,
2641  * and this was the last pending ast on the lockres,
2642  * atomically set the MIGRATING flag before we drop the lock.
2643  * this is how we ensure that migration can proceed with no
2644  * asts in progress.  note that it is ok if the state of the
2645  * queues is such that a lock should be granted in the future
2646  * or that a bast should be fired, because the new master will
2647  * shuffle the lists on this lockres as soon as it is migrated.
2648  */
2649 void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
2650                              struct dlm_lock_resource *res)
2651 {
2652         if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
2653                 return;
2654
2655         if (!res->migration_pending) {
2656                 spin_unlock(&res->spinlock);
2657                 return;
2658         }
2659
2660         BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
2661         res->migration_pending = 0;
2662         res->state |= DLM_LOCK_RES_MIGRATING;
2663         spin_unlock(&res->spinlock);
2664         wake_up(&res->wq);
2665         wake_up(&dlm->migration_wq);
2666 }