]> err.no Git - linux-2.6/blobdiff - fs/ocfs2/journal.c
Merge commit 'v2.6.27-rc1' into for-linus
[linux-2.6] / fs / ocfs2 / journal.c
index ed0c6d0850d79fca3a5fbab93d8eb528b41d1b40..a8c19cb3cfdd874235d16fc3926c9f9e07bba794 100644 (file)
@@ -64,6 +64,137 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
                                 int slot);
 static int ocfs2_commit_thread(void *arg);
 
+
+/*
+ * The recovery_list is a simple linked list of node numbers to recover.
+ * It is protected by the recovery_lock.
+ */
+
+struct ocfs2_recovery_map {
+       unsigned int rm_used;
+       unsigned int *rm_entries;
+};
+
+int ocfs2_recovery_init(struct ocfs2_super *osb)
+{
+       struct ocfs2_recovery_map *rm;
+
+       mutex_init(&osb->recovery_lock);
+       osb->disable_recovery = 0;
+       osb->recovery_thread_task = NULL;
+       init_waitqueue_head(&osb->recovery_event);
+
+       rm = kzalloc(sizeof(struct ocfs2_recovery_map) +
+                    osb->max_slots * sizeof(unsigned int),
+                    GFP_KERNEL);
+       if (!rm) {
+               mlog_errno(-ENOMEM);
+               return -ENOMEM;
+       }
+
+       rm->rm_entries = (unsigned int *)((char *)rm +
+                                         sizeof(struct ocfs2_recovery_map));
+       osb->recovery_map = rm;
+
+       return 0;
+}
+
+/* we can't grab the goofy sem lock from inside wait_event, so we use
+ * memory barriers to make sure that we'll see the null task before
+ * being woken up */
+static int ocfs2_recovery_thread_running(struct ocfs2_super *osb)
+{
+       mb();
+       return osb->recovery_thread_task != NULL;
+}
+
+void ocfs2_recovery_exit(struct ocfs2_super *osb)
+{
+       struct ocfs2_recovery_map *rm;
+
+       /* disable any new recovery threads and wait for any currently
+        * running ones to exit. Do this before setting the vol_state. */
+       mutex_lock(&osb->recovery_lock);
+       osb->disable_recovery = 1;
+       mutex_unlock(&osb->recovery_lock);
+       wait_event(osb->recovery_event, !ocfs2_recovery_thread_running(osb));
+
+       /* At this point, we know that no more recovery threads can be
+        * launched, so wait for any recovery completion work to
+        * complete. */
+       flush_workqueue(ocfs2_wq);
+
+       /*
+        * Now that recovery is shut down, and the osb is about to be
+        * freed,  the osb_lock is not taken here.
+        */
+       rm = osb->recovery_map;
+       /* XXX: Should we bug if there are dirty entries? */
+
+       kfree(rm);
+}
+
+static int __ocfs2_recovery_map_test(struct ocfs2_super *osb,
+                                    unsigned int node_num)
+{
+       int i;
+       struct ocfs2_recovery_map *rm = osb->recovery_map;
+
+       assert_spin_locked(&osb->osb_lock);
+
+       for (i = 0; i < rm->rm_used; i++) {
+               if (rm->rm_entries[i] == node_num)
+                       return 1;
+       }
+
+       return 0;
+}
+
+/* Behaves like test-and-set.  Returns the previous value */
+static int ocfs2_recovery_map_set(struct ocfs2_super *osb,
+                                 unsigned int node_num)
+{
+       struct ocfs2_recovery_map *rm = osb->recovery_map;
+
+       spin_lock(&osb->osb_lock);
+       if (__ocfs2_recovery_map_test(osb, node_num)) {
+               spin_unlock(&osb->osb_lock);
+               return 1;
+       }
+
+       /* XXX: Can this be exploited? Not from o2dlm... */
+       BUG_ON(rm->rm_used >= osb->max_slots);
+
+       rm->rm_entries[rm->rm_used] = node_num;
+       rm->rm_used++;
+       spin_unlock(&osb->osb_lock);
+
+       return 0;
+}
+
+static void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
+                                    unsigned int node_num)
+{
+       int i;
+       struct ocfs2_recovery_map *rm = osb->recovery_map;
+
+       spin_lock(&osb->osb_lock);
+
+       for (i = 0; i < rm->rm_used; i++) {
+               if (rm->rm_entries[i] == node_num)
+                       break;
+       }
+
+       if (i < rm->rm_used) {
+               /* XXX: be careful with the pointer math */
+               memmove(&(rm->rm_entries[i]), &(rm->rm_entries[i + 1]),
+                       (rm->rm_used - i - 1) * sizeof(unsigned int));
+               rm->rm_used--;
+       }
+
+       spin_unlock(&osb->osb_lock);
+}
+
 static int ocfs2_commit_cache(struct ocfs2_super *osb)
 {
        int status = 0;
@@ -198,7 +329,7 @@ int ocfs2_extend_trans(handle_t *handle, int nblocks)
 
        mlog(0, "Trying to extend transaction by %d blocks\n", nblocks);
 
-#ifdef OCFS2_DEBUG_FS
+#ifdef CONFIG_OCFS2_DEBUG_FS
        status = 1;
 #else
        status = journal_extend(handle, nblocks);
@@ -586,8 +717,7 @@ int ocfs2_journal_load(struct ocfs2_journal *journal, int local)
 
        mlog_entry_void();
 
-       if (!journal)
-               BUG();
+       BUG_ON(!journal);
 
        osb = journal->j_osb;
 
@@ -650,6 +780,23 @@ bail:
        return status;
 }
 
+static int ocfs2_recovery_completed(struct ocfs2_super *osb)
+{
+       int empty;
+       struct ocfs2_recovery_map *rm = osb->recovery_map;
+
+       spin_lock(&osb->osb_lock);
+       empty = (rm->rm_used == 0);
+       spin_unlock(&osb->osb_lock);
+
+       return empty;
+}
+
+void ocfs2_wait_for_recovery(struct ocfs2_super *osb)
+{
+       wait_event(osb->recovery_event, ocfs2_recovery_completed(osb));
+}
+
 /*
  * JBD Might read a cached version of another nodes journal file. We
  * don't want this as this file changes often and we get no
@@ -848,6 +995,7 @@ static int __ocfs2_recovery_thread(void *arg)
 {
        int status, node_num;
        struct ocfs2_super *osb = arg;
+       struct ocfs2_recovery_map *rm = osb->recovery_map;
 
        mlog_entry_void();
 
@@ -863,26 +1011,29 @@ restart:
                goto bail;
        }
 
-       while(!ocfs2_node_map_is_empty(osb, &osb->recovery_map)) {
-               node_num = ocfs2_node_map_first_set_bit(osb,
-                                                       &osb->recovery_map);
-               if (node_num == O2NM_INVALID_NODE_NUM) {
-                       mlog(0, "Out of nodes to recover.\n");
-                       break;
-               }
+       spin_lock(&osb->osb_lock);
+       while (rm->rm_used) {
+               /* It's always safe to remove entry zero, as we won't
+                * clear it until ocfs2_recover_node() has succeeded. */
+               node_num = rm->rm_entries[0];
+               spin_unlock(&osb->osb_lock);
 
                status = ocfs2_recover_node(osb, node_num);
-               if (status < 0) {
+               if (!status) {
+                       ocfs2_recovery_map_clear(osb, node_num);
+               } else {
                        mlog(ML_ERROR,
                             "Error %d recovering node %d on device (%u,%u)!\n",
                             status, node_num,
                             MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
                        mlog(ML_ERROR, "Volume requires unmount.\n");
-                       continue;
                }
 
-               ocfs2_recovery_map_clear(osb, node_num);
+               spin_lock(&osb->osb_lock);
        }
+       spin_unlock(&osb->osb_lock);
+       mlog(0, "All nodes recovered\n");
+
        ocfs2_super_unlock(osb, 1);
 
        /* We always run recovery on our own orphan dir - the dead
@@ -893,8 +1044,7 @@ restart:
 
 bail:
        mutex_lock(&osb->recovery_lock);
-       if (!status &&
-           !ocfs2_node_map_is_empty(osb, &osb->recovery_map)) {
+       if (!status && !ocfs2_recovery_completed(osb)) {
                mutex_unlock(&osb->recovery_lock);
                goto restart;
        }
@@ -924,8 +1074,8 @@ void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num)
 
        /* People waiting on recovery will wait on
         * the recovery map to empty. */
-       if (!ocfs2_recovery_map_set(osb, node_num))
-               mlog(0, "node %d already be in recovery.\n", node_num);
+       if (ocfs2_recovery_map_set(osb, node_num))
+               mlog(0, "node %d already in recovery map.\n", node_num);
 
        mlog(0, "starting recovery thread...\n");
 
@@ -1197,7 +1347,7 @@ int ocfs2_mark_dead_nodes(struct ocfs2_super *osb)
                if (status == -ENOENT)
                        continue;
 
-               if (ocfs2_node_map_test_bit(osb, &osb->recovery_map, node_num))
+               if (__ocfs2_recovery_map_test(osb, node_num))
                        continue;
                spin_unlock(&osb->osb_lock);