]> err.no Git - linux-2.6/blobdiff - fs/dlm/lock.c
Merge branch 'upstream-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/mfashe...
[linux-2.6] / fs / dlm / lock.c
index b865a46059ddd27aac86dc05eec54c62865e00f6..d8d6e729f96b669b5a6ed16bfb92c776cfc4744c 100644 (file)
@@ -85,6 +85,7 @@ static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
                                    struct dlm_message *ms);
 static int receive_extralen(struct dlm_message *ms);
+static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
 
 /*
  * Lock compatibilty matrix - thanks Steve
@@ -223,6 +224,16 @@ static inline int is_demoted(struct dlm_lkb *lkb)
        return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
 }
 
+static inline int is_altmode(struct dlm_lkb *lkb)
+{
+       return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
+}
+
+static inline int is_granted(struct dlm_lkb *lkb)
+{
+       return (lkb->lkb_status == DLM_LKSTS_GRANTED);
+}
+
 static inline int is_remote(struct dlm_rsb *r)
 {
        DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
@@ -579,7 +590,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
        /* counter can roll over so we must verify lkid is not in use */
 
        while (lkid == 0) {
-               lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16);
+               lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
 
                list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
                                    lkb_idtbl_list) {
@@ -600,8 +611,8 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
 
 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
 {
-       uint16_t bucket = lkid & 0xFFFF;
        struct dlm_lkb *lkb;
+       uint16_t bucket = (lkid >> 16);
 
        list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
                if (lkb->lkb_id == lkid)
@@ -613,7 +624,7 @@ static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
 {
        struct dlm_lkb *lkb;
-       uint16_t bucket = lkid & 0xFFFF;
+       uint16_t bucket = (lkid >> 16);
 
        if (bucket >= ls->ls_lkbtbl_size)
                return -EBADSLT;
@@ -643,7 +654,7 @@ static void kill_lkb(struct kref *kref)
 
 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
 {
-       uint16_t bucket = lkb->lkb_id & 0xFFFF;
+       uint16_t bucket = (lkb->lkb_id >> 16);
 
        write_lock(&ls->ls_lkbtbl[bucket].lock);
        if (kref_put(&lkb->lkb_ref, kill_lkb)) {
@@ -1190,6 +1201,50 @@ static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
                queue_cast(r, lkb, 0);
 }
 
+/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
+   change the granted/requested modes.  We're munging things accordingly in
+   the process copy.
+   CONVDEADLK: our grmode may have been forced down to NL to resolve a
+   conversion deadlock
+   ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
+   compatible with other granted locks */
+
+static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
+{
+       if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
+               log_print("munge_demoted %x invalid reply type %d",
+                         lkb->lkb_id, ms->m_type);
+               return;
+       }
+
+       if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
+               log_print("munge_demoted %x invalid modes gr %d rq %d",
+                         lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
+               return;
+       }
+
+       lkb->lkb_grmode = DLM_LOCK_NL;
+}
+
+static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
+{
+       if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
+           ms->m_type != DLM_MSG_GRANT) {
+               log_print("munge_altmode %x invalid reply type %d",
+                         lkb->lkb_id, ms->m_type);
+               return;
+       }
+
+       if (lkb->lkb_exflags & DLM_LKF_ALTPR)
+               lkb->lkb_rqmode = DLM_LOCK_PR;
+       else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
+               lkb->lkb_rqmode = DLM_LOCK_CW;
+       else {
+               log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
+               dlm_print_lkb(lkb);
+       }
+}
+
 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
 {
        struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
@@ -1964,9 +2019,24 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
                goto out;
        }
 
-       if (can_be_queued(lkb)) {
-               if (is_demoted(lkb))
+       /* is_demoted() means the can_be_granted() above set the grmode
+          to NL, and left us on the granted queue.  This auto-demotion
+          (due to CONVDEADLK) might mean other locks, and/or this lock, are
+          now grantable.  We have to try to grant other converting locks
+          before we try again to grant this one. */
+
+       if (is_demoted(lkb)) {
+               grant_pending_convert(r, DLM_LOCK_IV);
+               if (_can_be_granted(r, lkb, 1)) {
+                       grant_lock(r, lkb);
+                       queue_cast(r, lkb, 0);
                        grant_pending_locks(r);
+                       goto out;
+               }
+               /* else fall through and move to convert queue */
+       }
+
+       if (can_be_queued(lkb)) {
                error = -EINPROGRESS;
                del_lkb(r, lkb);
                add_lkb(r, lkb, DLM_LKSTS_CONVERT);
@@ -2301,31 +2371,14 @@ int dlm_unlock(dlm_lockspace_t *lockspace,
  * receive_lookup_reply                send_lookup_reply
  */
 
-static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
-                         int to_nodeid, int mstype,
-                         struct dlm_message **ms_ret,
-                         struct dlm_mhandle **mh_ret)
+static int _create_message(struct dlm_ls *ls, int mb_len,
+                          int to_nodeid, int mstype,
+                          struct dlm_message **ms_ret,
+                          struct dlm_mhandle **mh_ret)
 {
        struct dlm_message *ms;
        struct dlm_mhandle *mh;
        char *mb;
-       int mb_len = sizeof(struct dlm_message);
-
-       switch (mstype) {
-       case DLM_MSG_REQUEST:
-       case DLM_MSG_LOOKUP:
-       case DLM_MSG_REMOVE:
-               mb_len += r->res_length;
-               break;
-       case DLM_MSG_CONVERT:
-       case DLM_MSG_UNLOCK:
-       case DLM_MSG_REQUEST_REPLY:
-       case DLM_MSG_CONVERT_REPLY:
-       case DLM_MSG_GRANT:
-               if (lkb && lkb->lkb_lvbptr)
-                       mb_len += r->res_ls->ls_lvblen;
-               break;
-       }
 
        /* get_buffer gives us a message handle (mh) that we need to
           pass into lowcomms_commit and a message buffer (mb) that we
@@ -2340,7 +2393,7 @@ static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
        ms = (struct dlm_message *) mb;
 
        ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
-       ms->m_header.h_lockspace = r->res_ls->ls_global_id;
+       ms->m_header.h_lockspace = ls->ls_global_id;
        ms->m_header.h_nodeid = dlm_our_nodeid();
        ms->m_header.h_length = mb_len;
        ms->m_header.h_cmd = DLM_MSG;
@@ -2352,6 +2405,33 @@ static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
        return 0;
 }
 
+static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
+                         int to_nodeid, int mstype,
+                         struct dlm_message **ms_ret,
+                         struct dlm_mhandle **mh_ret)
+{
+       int mb_len = sizeof(struct dlm_message);
+
+       switch (mstype) {
+       case DLM_MSG_REQUEST:
+       case DLM_MSG_LOOKUP:
+       case DLM_MSG_REMOVE:
+               mb_len += r->res_length;
+               break;
+       case DLM_MSG_CONVERT:
+       case DLM_MSG_UNLOCK:
+       case DLM_MSG_REQUEST_REPLY:
+       case DLM_MSG_CONVERT_REPLY:
+       case DLM_MSG_GRANT:
+               if (lkb && lkb->lkb_lvbptr)
+                       mb_len += r->res_ls->ls_lvblen;
+               break;
+       }
+
+       return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
+                              ms_ret, mh_ret);
+}
+
 /* further lowcomms enhancements or alternate implementations may make
    the return value from this function useful at some point */
 
@@ -2897,6 +2977,8 @@ static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
        lock_rsb(r);
 
        receive_flags_reply(lkb, ms);
+       if (is_altmode(lkb))
+               munge_altmode(lkb, ms);
        grant_lock_pc(r, lkb, ms);
        queue_cast(r, lkb, 0);
 
@@ -2977,6 +3059,11 @@ static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
        dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
 }
 
+static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
+{
+       do_purge(ls, ms->m_nodeid, ms->m_pid);
+}
+
 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
 {
        struct dlm_lkb *lkb;
@@ -3022,6 +3109,8 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
                /* request was queued or granted on remote master */
                receive_flags_reply(lkb, ms);
                lkb->lkb_remid = ms->m_lkid;
+               if (is_altmode(lkb))
+                       munge_altmode(lkb, ms);
                if (result)
                        add_lkb(r, lkb, DLM_LKSTS_WAITING);
                else {
@@ -3085,6 +3174,9 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
 
        case -EINPROGRESS:
                /* convert was queued on remote master */
+               receive_flags_reply(lkb, ms);
+               if (is_demoted(lkb))
+                       munge_demoted(lkb, ms);
                del_lkb(r, lkb);
                add_lkb(r, lkb, DLM_LKSTS_CONVERT);
                break;
@@ -3092,6 +3184,8 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
        case 0:
                /* convert was granted on remote master */
                receive_flags_reply(lkb, ms);
+               if (is_demoted(lkb))
+                       munge_demoted(lkb, ms);
                grant_lock_pc(r, lkb, ms);
                queue_cast(r, lkb, 0);
                break;
@@ -3399,6 +3493,12 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
                receive_lookup_reply(ls, ms);
                break;
 
+       /* other messages */
+
+       case DLM_MSG_PURGE:
+               receive_purge(ls, ms);
+               break;
+
        default:
                log_error(ls, "unknown message type %d", ms->m_type);
        }
@@ -4250,3 +4350,92 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
        unlock_recovery(ls);
 }
 
+static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
+{
+       struct dlm_lkb *lkb, *safe;
+
+       while (1) {
+               lkb = NULL;
+               spin_lock(&proc->locks_spin);
+               if (!list_empty(&proc->locks)) {
+                       lkb = list_entry(proc->locks.next, struct dlm_lkb,
+                                        lkb_ownqueue);
+                       list_del_init(&lkb->lkb_ownqueue);
+               }
+               spin_unlock(&proc->locks_spin);
+
+               if (!lkb)
+                       break;
+
+               lkb->lkb_flags |= DLM_IFL_DEAD;
+               unlock_proc_lock(ls, lkb);
+               dlm_put_lkb(lkb); /* ref from proc->locks list */
+       }
+
+       spin_lock(&proc->locks_spin);
+       list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
+               list_del_init(&lkb->lkb_ownqueue);
+               lkb->lkb_flags |= DLM_IFL_DEAD;
+               dlm_put_lkb(lkb);
+       }
+       spin_unlock(&proc->locks_spin);
+
+       spin_lock(&proc->asts_spin);
+       list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
+               list_del(&lkb->lkb_astqueue);
+               dlm_put_lkb(lkb);
+       }
+       spin_unlock(&proc->asts_spin);
+}
+
+/* pid of 0 means purge all orphans */
+
+static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
+{
+       struct dlm_lkb *lkb, *safe;
+
+       mutex_lock(&ls->ls_orphans_mutex);
+       list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
+               if (pid && lkb->lkb_ownpid != pid)
+                       continue;
+               unlock_proc_lock(ls, lkb);
+               list_del_init(&lkb->lkb_ownqueue);
+               dlm_put_lkb(lkb);
+       }
+       mutex_unlock(&ls->ls_orphans_mutex);
+}
+
+static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
+{
+       struct dlm_message *ms;
+       struct dlm_mhandle *mh;
+       int error;
+
+       error = _create_message(ls, sizeof(struct dlm_message), nodeid,
+                               DLM_MSG_PURGE, &ms, &mh);
+       if (error)
+               return error;
+       ms->m_nodeid = nodeid;
+       ms->m_pid = pid;
+
+       return send_message(mh, ms);
+}
+
+int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
+                  int nodeid, int pid)
+{
+       int error = 0;
+
+       if (nodeid != dlm_our_nodeid()) {
+               error = send_purge(ls, nodeid, pid);
+       } else {
+               lock_recovery(ls);
+               if (pid == current->pid)
+                       purge_proc_locks(ls, proc);
+               else
+                       do_purge(ls, nodeid, pid);
+               unlock_recovery(ls);
+       }
+       return error;
+}
+