]> err.no Git - linux-2.6/blobdiff - fs/dlm/lock.c
dlm: reject messages from non-members
[linux-2.6] / fs / dlm / lock.c
index 3915b8e1414623db82bce20d0d4b4c41fc6b9fa3..c3b9fca17044f8eb8221691f3372fe042650ca5a 100644 (file)
@@ -1,7 +1,7 @@
 /******************************************************************************
 *******************************************************************************
 **
-**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -88,7 +88,6 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
 static int receive_extralen(struct dlm_message *ms);
 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
 static void del_timeout(struct dlm_lkb *lkb);
-void dlm_timeout_warn(struct dlm_lkb *lkb);
 
 /*
  * Lock compatibilty matrix - thanks Steve
@@ -335,7 +334,7 @@ static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
 {
        struct dlm_rsb *r;
 
-       r = allocate_rsb(ls, len);
+       r = dlm_allocate_rsb(ls, len);
        if (!r)
                return NULL;
 
@@ -478,7 +477,7 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
        error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
        if (!error) {
                write_unlock(&ls->ls_rsbtbl[bucket].lock);
-               free_rsb(r);
+               dlm_free_rsb(r);
                r = tmp;
                goto out;
        }
@@ -519,7 +518,7 @@ static void toss_rsb(struct kref *kref)
        list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
        r->res_toss_time = jiffies;
        if (r->res_lvbptr) {
-               free_lvb(r->res_lvbptr);
+               dlm_free_lvb(r->res_lvbptr);
                r->res_lvbptr = NULL;
        }
 }
@@ -589,7 +588,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
        uint32_t lkid = 0;
        uint16_t bucket;
 
-       lkb = allocate_lkb(ls);
+       lkb = dlm_allocate_lkb(ls);
        if (!lkb)
                return -ENOMEM;
 
@@ -683,8 +682,8 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
 
                /* for local/process lkbs, lvbptr points to caller's lksb */
                if (lkb->lkb_lvbptr && is_master_copy(lkb))
-                       free_lvb(lkb->lkb_lvbptr);
-               free_lkb(lkb);
+                       dlm_free_lvb(lkb->lkb_lvbptr);
+               dlm_free_lkb(lkb);
                return 1;
        } else {
                write_unlock(&ls->ls_lkbtbl[bucket].lock);
@@ -988,7 +987,7 @@ static int shrink_bucket(struct dlm_ls *ls, int b)
 
                        if (is_master(r))
                                dir_remove(r);
-                       free_rsb(r);
+                       dlm_free_rsb(r);
                        count++;
                } else {
                        write_unlock(&ls->ls_rsbtbl[b].lock);
@@ -1171,7 +1170,7 @@ static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
                        return;
 
                if (!r->res_lvbptr)
-                       r->res_lvbptr = allocate_lvb(r->res_ls);
+                       r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
 
                if (!r->res_lvbptr)
                        return;
@@ -1203,7 +1202,7 @@ static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
                return;
 
        if (!r->res_lvbptr)
-               r->res_lvbptr = allocate_lvb(r->res_ls);
+               r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
 
        if (!r->res_lvbptr)
                return;
@@ -1941,8 +1940,11 @@ static void confirm_master(struct dlm_rsb *r, int error)
                break;
 
        case -EAGAIN:
-               /* the remote master didn't queue our NOQUEUE request;
-                  make a waiting lkb the first_lkid */
+       case -EBADR:
+       case -ENOTBLK:
+               /* the remote request failed and won't be retried (it was
+                  a NOQUEUE, or has been canceled/unlocked); make a waiting
+                  lkb the first_lkid */
 
                r->res_first_lkid = 0;
 
@@ -2986,7 +2988,7 @@ static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
 
        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
                if (!lkb->lkb_lvbptr)
-                       lkb->lkb_lvbptr = allocate_lvb(ls);
+                       lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
                if (!lkb->lkb_lvbptr)
                        return -ENOMEM;
                len = receive_extralen(ms);
@@ -3010,7 +3012,7 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
 
        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
                /* lkb was just created so there won't be an lvb yet */
-               lkb->lkb_lvbptr = allocate_lvb(ls);
+               lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
                if (!lkb->lkb_lvbptr)
                        return -ENOMEM;
        }
@@ -3383,6 +3385,7 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
                if (is_overlap(lkb)) {
                        /* we'll ignore error in cancel/unlock reply */
                        queue_cast_overlap(r, lkb);
+                       confirm_master(r, result);
                        unhold_lkb(lkb); /* undoes create_lkb() */
                } else
                        _request_lock(r, lkb);
@@ -3640,6 +3643,13 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
 
 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
 {
+       if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
+               log_debug(ls, "ignore non-member message %d from %d %x %x %d",
+                         ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
+                         ms->m_remid, ms->m_result);
+               return;
+       }
+
        switch (ms->m_type) {
 
        /* messages sent to a master node */
@@ -3847,6 +3857,7 @@ static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
 void dlm_recover_waiters_pre(struct dlm_ls *ls)
 {
        struct dlm_lkb *lkb, *safe;
+       int wait_type, stub_unlock_result, stub_cancel_result;
 
        mutex_lock(&ls->ls_waiters_mutex);
 
@@ -3865,7 +3876,33 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
                if (!waiter_needs_recovery(ls, lkb))
                        continue;
 
-               switch (lkb->lkb_wait_type) {
+               wait_type = lkb->lkb_wait_type;
+               stub_unlock_result = -DLM_EUNLOCK;
+               stub_cancel_result = -DLM_ECANCEL;
+
+               /* Main reply may have been received leaving a zero wait_type,
+                  but a reply for the overlapping op may not have been
+                  received.  In that case we need to fake the appropriate
+                  reply for the overlap op. */
+
+               if (!wait_type) {
+                       if (is_overlap_cancel(lkb)) {
+                               wait_type = DLM_MSG_CANCEL;
+                               if (lkb->lkb_grmode == DLM_LOCK_IV)
+                                       stub_cancel_result = 0;
+                       }
+                       if (is_overlap_unlock(lkb)) {
+                               wait_type = DLM_MSG_UNLOCK;
+                               if (lkb->lkb_grmode == DLM_LOCK_IV)
+                                       stub_unlock_result = -ENOENT;
+                       }
+
+                       log_debug(ls, "rwpre overlap %x %x %d %d %d",
+                                 lkb->lkb_id, lkb->lkb_flags, wait_type,
+                                 stub_cancel_result, stub_unlock_result);
+               }
+
+               switch (wait_type) {
 
                case DLM_MSG_REQUEST:
                        lkb->lkb_flags |= DLM_IFL_RESEND;
@@ -3878,7 +3915,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
                case DLM_MSG_UNLOCK:
                        hold_lkb(lkb);
                        ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
-                       ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
+                       ls->ls_stub_ms.m_result = stub_unlock_result;
                        ls->ls_stub_ms.m_flags = lkb->lkb_flags;
                        _receive_unlock_reply(lkb, &ls->ls_stub_ms);
                        dlm_put_lkb(lkb);
@@ -3887,15 +3924,15 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
                case DLM_MSG_CANCEL:
                        hold_lkb(lkb);
                        ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
-                       ls->ls_stub_ms.m_result = -DLM_ECANCEL;
+                       ls->ls_stub_ms.m_result = stub_cancel_result;
                        ls->ls_stub_ms.m_flags = lkb->lkb_flags;
                        _receive_cancel_reply(lkb, &ls->ls_stub_ms);
                        dlm_put_lkb(lkb);
                        break;
 
                default:
-                       log_error(ls, "invalid lkb wait_type %d",
-                                 lkb->lkb_wait_type);
+                       log_error(ls, "invalid lkb wait_type %d %d",
+                                 lkb->lkb_wait_type, wait_type);
                }
                schedule();
        }
@@ -4184,7 +4221,7 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
        lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
 
        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
-               lkb->lkb_lvbptr = allocate_lvb(ls);
+               lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
                if (!lkb->lkb_lvbptr)
                        return -ENOMEM;
                lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
@@ -4259,7 +4296,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
        put_rsb(r);
  out:
        if (error)
-               log_print("recover_master_copy %d %x", error, rl->rl_lkid);
+               log_debug(ls, "recover_master_copy %d %x", error, rl->rl_lkid);
        rl->rl_result = error;
        return error;
 }
@@ -4342,7 +4379,7 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
                }
        }
 
-       /* After ua is attached to lkb it will be freed by free_lkb().
+       /* After ua is attached to lkb it will be freed by dlm_free_lkb().
           When DLM_IFL_USER is set, the dlm knows that this is a userspace
           lock and that lkb_astparam is the dlm_user_args structure. */
 
@@ -4679,6 +4716,7 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
        }
 
        list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
+               lkb->lkb_ast_type = 0;
                list_del(&lkb->lkb_astqueue);
                dlm_put_lkb(lkb);
        }