]> err.no Git - linux-2.6/blobdiff - fs/dlm/lock.c
dlm: reject messages from non-members
[linux-2.6] / fs / dlm / lock.c
index 2082daf083d86cdec3b3697e061f8f3ff703e867..c3b9fca17044f8eb8221691f3372fe042650ca5a 100644 (file)
@@ -1,7 +1,7 @@
 /******************************************************************************
 *******************************************************************************
 **
-**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -88,7 +88,6 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
 static int receive_extralen(struct dlm_message *ms);
 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
 static void del_timeout(struct dlm_lkb *lkb);
-void dlm_timeout_warn(struct dlm_lkb *lkb);
 
 /*
  * Lock compatibilty matrix - thanks Steve
@@ -335,7 +334,7 @@ static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
 {
        struct dlm_rsb *r;
 
-       r = allocate_rsb(ls, len);
+       r = dlm_allocate_rsb(ls, len);
        if (!r)
                return NULL;
 
@@ -478,7 +477,7 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
        error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
        if (!error) {
                write_unlock(&ls->ls_rsbtbl[bucket].lock);
-               free_rsb(r);
+               dlm_free_rsb(r);
                r = tmp;
                goto out;
        }
@@ -519,7 +518,7 @@ static void toss_rsb(struct kref *kref)
        list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
        r->res_toss_time = jiffies;
        if (r->res_lvbptr) {
-               free_lvb(r->res_lvbptr);
+               dlm_free_lvb(r->res_lvbptr);
                r->res_lvbptr = NULL;
        }
 }
@@ -589,7 +588,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
        uint32_t lkid = 0;
        uint16_t bucket;
 
-       lkb = allocate_lkb(ls);
+       lkb = dlm_allocate_lkb(ls);
        if (!lkb)
                return -ENOMEM;
 
@@ -683,8 +682,8 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
 
                /* for local/process lkbs, lvbptr points to caller's lksb */
                if (lkb->lkb_lvbptr && is_master_copy(lkb))
-                       free_lvb(lkb->lkb_lvbptr);
-               free_lkb(lkb);
+                       dlm_free_lvb(lkb->lkb_lvbptr);
+               dlm_free_lkb(lkb);
                return 1;
        } else {
                write_unlock(&ls->ls_lkbtbl[bucket].lock);
@@ -988,7 +987,7 @@ static int shrink_bucket(struct dlm_ls *ls, int b)
 
                        if (is_master(r))
                                dir_remove(r);
-                       free_rsb(r);
+                       dlm_free_rsb(r);
                        count++;
                } else {
                        write_unlock(&ls->ls_rsbtbl[b].lock);
@@ -1171,7 +1170,7 @@ static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
                        return;
 
                if (!r->res_lvbptr)
-                       r->res_lvbptr = allocate_lvb(r->res_ls);
+                       r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
 
                if (!r->res_lvbptr)
                        return;
@@ -1203,7 +1202,7 @@ static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
                return;
 
        if (!r->res_lvbptr)
-               r->res_lvbptr = allocate_lvb(r->res_ls);
+               r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
 
        if (!r->res_lvbptr)
                return;
@@ -1941,8 +1940,11 @@ static void confirm_master(struct dlm_rsb *r, int error)
                break;
 
        case -EAGAIN:
-               /* the remote master didn't queue our NOQUEUE request;
-                  make a waiting lkb the first_lkid */
+       case -EBADR:
+       case -ENOTBLK:
+               /* the remote request failed and won't be retried (it was
+                  a NOQUEUE, or has been canceled/unlocked); make a waiting
+                  lkb the first_lkid */
 
                r->res_first_lkid = 0;
 
@@ -2986,7 +2988,7 @@ static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
 
        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
                if (!lkb->lkb_lvbptr)
-                       lkb->lkb_lvbptr = allocate_lvb(ls);
+                       lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
                if (!lkb->lkb_lvbptr)
                        return -ENOMEM;
                len = receive_extralen(ms);
@@ -3010,7 +3012,7 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
 
        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
                /* lkb was just created so there won't be an lvb yet */
-               lkb->lkb_lvbptr = allocate_lvb(ls);
+               lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
                if (!lkb->lkb_lvbptr)
                        return -ENOMEM;
        }
@@ -3383,6 +3385,7 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
                if (is_overlap(lkb)) {
                        /* we'll ignore error in cancel/unlock reply */
                        queue_cast_overlap(r, lkb);
+                       confirm_master(r, result);
                        unhold_lkb(lkb); /* undoes create_lkb() */
                } else
                        _request_lock(r, lkb);
@@ -3638,53 +3641,13 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
        dlm_put_lkb(lkb);
 }
 
-int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
+static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
 {
-       struct dlm_message *ms = (struct dlm_message *) hd;
-       struct dlm_ls *ls;
-       int error = 0;
-
-       if (!recovery)
-               dlm_message_in(ms);
-
-       ls = dlm_find_lockspace_global(hd->h_lockspace);
-       if (!ls) {
-               log_print("drop message %d from %d for unknown lockspace %d",
-                         ms->m_type, nodeid, hd->h_lockspace);
-               return -EINVAL;
-       }
-
-       /* recovery may have just ended leaving a bunch of backed-up requests
-          in the requestqueue; wait while dlm_recoverd clears them */
-
-       if (!recovery)
-               dlm_wait_requestqueue(ls);
-
-       /* recovery may have just started while there were a bunch of
-          in-flight requests -- save them in requestqueue to be processed
-          after recovery.  we can't let dlm_recvd block on the recovery
-          lock.  if dlm_recoverd is calling this function to clear the
-          requestqueue, it needs to be interrupted (-EINTR) if another
-          recovery operation is starting. */
-
-       while (1) {
-               if (dlm_locking_stopped(ls)) {
-                       if (recovery) {
-                               error = -EINTR;
-                               goto out;
-                       }
-                       error = dlm_add_requestqueue(ls, nodeid, hd);
-                       if (error == -EAGAIN)
-                               continue;
-                       else {
-                               error = -EINTR;
-                               goto out;
-                       }
-               }
-
-               if (dlm_lock_recovery_try(ls))
-                       break;
-               schedule();
+       if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
+               log_debug(ls, "ignore non-member message %d from %d %x %x %d",
+                         ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
+                         ms->m_remid, ms->m_result);
+               return;
        }
 
        switch (ms->m_type) {
@@ -3761,17 +3724,90 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
                log_error(ls, "unknown message type %d", ms->m_type);
        }
 
-       dlm_unlock_recovery(ls);
- out:
-       dlm_put_lockspace(ls);
        dlm_astd_wake();
-       return error;
 }
 
+/* If the lockspace is in recovery mode (locking stopped), then normal
+   messages are saved on the requestqueue for processing after recovery is
+   done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
+   messages off the requestqueue before we process new ones. This occurs right
+   after recovery completes when we transition from saving all messages on
+   requestqueue, to processing all the saved messages, to processing new
+   messages as they arrive. */
 
-/*
- * Recovery related
- */
+static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
+                               int nodeid)
+{
+       if (dlm_locking_stopped(ls)) {
+               dlm_add_requestqueue(ls, nodeid, (struct dlm_header *) ms);
+       } else {
+               dlm_wait_requestqueue(ls);
+               _receive_message(ls, ms);
+       }
+}
+
+/* This is called by dlm_recoverd to process messages that were saved on
+   the requestqueue. */
+
+void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
+{
+       _receive_message(ls, ms);
+}
+
+/* This is called by the midcomms layer when something is received for
+   the lockspace.  It could be either a MSG (normal message sent as part of
+   standard locking activity) or an RCOM (recovery message sent as part of
+   lockspace recovery). */
+
+void dlm_receive_buffer(struct dlm_header *hd, int nodeid)
+{
+       struct dlm_message *ms = (struct dlm_message *) hd;
+       struct dlm_rcom *rc = (struct dlm_rcom *) hd;
+       struct dlm_ls *ls;
+       int type = 0;
+
+       switch (hd->h_cmd) {
+       case DLM_MSG:
+               dlm_message_in(ms);
+               type = ms->m_type;
+               break;
+       case DLM_RCOM:
+               dlm_rcom_in(rc);
+               type = rc->rc_type;
+               break;
+       default:
+               log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
+               return;
+       }
+
+       if (hd->h_nodeid != nodeid) {
+               log_print("invalid h_nodeid %d from %d lockspace %x",
+                         hd->h_nodeid, nodeid, hd->h_lockspace);
+               return;
+       }
+
+       ls = dlm_find_lockspace_global(hd->h_lockspace);
+       if (!ls) {
+               log_print("invalid h_lockspace %x from %d cmd %d type %d",
+                         hd->h_lockspace, nodeid, hd->h_cmd, type);
+
+               if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
+                       dlm_send_ls_not_ready(nodeid, rc);
+               return;
+       }
+
+       /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
+          be inactive (in this ls) before transitioning to recovery mode */
+
+       down_read(&ls->ls_recv_active);
+       if (hd->h_cmd == DLM_MSG)
+               dlm_receive_message(ls, ms, nodeid);
+       else
+               dlm_receive_rcom(ls, rc, nodeid);
+       up_read(&ls->ls_recv_active);
+
+       dlm_put_lockspace(ls);
+}
 
 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
 {
@@ -3821,6 +3857,7 @@ static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
 void dlm_recover_waiters_pre(struct dlm_ls *ls)
 {
        struct dlm_lkb *lkb, *safe;
+       int wait_type, stub_unlock_result, stub_cancel_result;
 
        mutex_lock(&ls->ls_waiters_mutex);
 
@@ -3839,7 +3876,33 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
                if (!waiter_needs_recovery(ls, lkb))
                        continue;
 
-               switch (lkb->lkb_wait_type) {
+               wait_type = lkb->lkb_wait_type;
+               stub_unlock_result = -DLM_EUNLOCK;
+               stub_cancel_result = -DLM_ECANCEL;
+
+               /* Main reply may have been received leaving a zero wait_type,
+                  but a reply for the overlapping op may not have been
+                  received.  In that case we need to fake the appropriate
+                  reply for the overlap op. */
+
+               if (!wait_type) {
+                       if (is_overlap_cancel(lkb)) {
+                               wait_type = DLM_MSG_CANCEL;
+                               if (lkb->lkb_grmode == DLM_LOCK_IV)
+                                       stub_cancel_result = 0;
+                       }
+                       if (is_overlap_unlock(lkb)) {
+                               wait_type = DLM_MSG_UNLOCK;
+                               if (lkb->lkb_grmode == DLM_LOCK_IV)
+                                       stub_unlock_result = -ENOENT;
+                       }
+
+                       log_debug(ls, "rwpre overlap %x %x %d %d %d",
+                                 lkb->lkb_id, lkb->lkb_flags, wait_type,
+                                 stub_cancel_result, stub_unlock_result);
+               }
+
+               switch (wait_type) {
 
                case DLM_MSG_REQUEST:
                        lkb->lkb_flags |= DLM_IFL_RESEND;
@@ -3852,7 +3915,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
                case DLM_MSG_UNLOCK:
                        hold_lkb(lkb);
                        ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
-                       ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
+                       ls->ls_stub_ms.m_result = stub_unlock_result;
                        ls->ls_stub_ms.m_flags = lkb->lkb_flags;
                        _receive_unlock_reply(lkb, &ls->ls_stub_ms);
                        dlm_put_lkb(lkb);
@@ -3861,15 +3924,15 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
                case DLM_MSG_CANCEL:
                        hold_lkb(lkb);
                        ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
-                       ls->ls_stub_ms.m_result = -DLM_ECANCEL;
+                       ls->ls_stub_ms.m_result = stub_cancel_result;
                        ls->ls_stub_ms.m_flags = lkb->lkb_flags;
                        _receive_cancel_reply(lkb, &ls->ls_stub_ms);
                        dlm_put_lkb(lkb);
                        break;
 
                default:
-                       log_error(ls, "invalid lkb wait_type %d",
-                                 lkb->lkb_wait_type);
+                       log_error(ls, "invalid lkb wait_type %d %d",
+                                 lkb->lkb_wait_type, wait_type);
                }
                schedule();
        }
@@ -4158,7 +4221,7 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
        lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
 
        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
-               lkb->lkb_lvbptr = allocate_lvb(ls);
+               lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
                if (!lkb->lkb_lvbptr)
                        return -ENOMEM;
                lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
@@ -4233,7 +4296,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
        put_rsb(r);
  out:
        if (error)
-               log_print("recover_master_copy %d %x", error, rl->rl_lkid);
+               log_debug(ls, "recover_master_copy %d %x", error, rl->rl_lkid);
        rl->rl_result = error;
        return error;
 }
@@ -4316,7 +4379,7 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
                }
        }
 
-       /* After ua is attached to lkb it will be freed by free_lkb().
+       /* After ua is attached to lkb it will be freed by dlm_free_lkb().
           When DLM_IFL_USER is set, the dlm knows that this is a userspace
           lock and that lkb_astparam is the dlm_user_args structure. */
 
@@ -4429,7 +4492,8 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
 
        if (lvb_in && ua->lksb.sb_lvbptr)
                memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
-       ua->castparam = ua_tmp->castparam;
+       if (ua_tmp->castparam)
+               ua->castparam = ua_tmp->castparam;
        ua->user_lksb = ua_tmp->user_lksb;
 
        error = set_unlock_args(flags, ua, &args);
@@ -4474,7 +4538,8 @@ int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
                goto out;
 
        ua = (struct dlm_user_args *)lkb->lkb_astparam;
-       ua->castparam = ua_tmp->castparam;
+       if (ua_tmp->castparam)
+               ua->castparam = ua_tmp->castparam;
        ua->user_lksb = ua_tmp->user_lksb;
 
        error = set_unlock_args(flags, ua, &args);
@@ -4651,6 +4716,7 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
        }
 
        list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
+               lkb->lkb_ast_type = 0;
                list_del(&lkb->lkb_astqueue);
                dlm_put_lkb(lkb);
        }