X-Git-Url: https://err.no/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=fs%2Fdlm%2Flock.c;h=2082daf083d86cdec3b3697e061f8f3ff703e867;hb=ed72df448250a6da72b65e7881eb63c5ded3475f;hp=e725005fafd024cd41de5bdb9c0271497a7d2e1f;hpb=58a3bb59973e33a428d72fa530a3d1d81feb0e8f;p=linux-2.6 diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index e725005faf..2082daf083 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -1,7 +1,7 @@ /****************************************************************************** ******************************************************************************* ** -** Copyright (C) 2005 Red Hat, Inc. All rights reserved. +** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -82,9 +82,13 @@ static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode); static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb); static int send_remove(struct dlm_rsb *r); static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); +static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, struct dlm_message *ms); static int receive_extralen(struct dlm_message *ms); +static void do_purge(struct dlm_ls *ls, int nodeid, int pid); +static void del_timeout(struct dlm_lkb *lkb); +void dlm_timeout_warn(struct dlm_lkb *lkb); /* * Lock compatibilty matrix - thanks Steve @@ -193,17 +197,17 @@ void dlm_dump_rsb(struct dlm_rsb *r) /* Threads cannot use the lockspace while it's being recovered */ -static inline void lock_recovery(struct dlm_ls *ls) +static inline void dlm_lock_recovery(struct dlm_ls *ls) { down_read(&ls->ls_in_recovery); } -static inline void unlock_recovery(struct dlm_ls *ls) +void dlm_unlock_recovery(struct dlm_ls *ls) { up_read(&ls->ls_in_recovery); } -static inline int lock_recovery_try(struct dlm_ls *ls) +int dlm_lock_recovery_try(struct dlm_ls *ls) { return down_read_trylock(&ls->ls_in_recovery); } @@ -223,6 +227,16 @@ static inline int is_demoted(struct dlm_lkb *lkb) return (lkb->lkb_sbflags & DLM_SBF_DEMOTED); } +static inline int is_altmode(struct dlm_lkb *lkb) +{ + return (lkb->lkb_sbflags & DLM_SBF_ALTMODE); +} + +static inline int is_granted(struct dlm_lkb *lkb) +{ + return (lkb->lkb_status == DLM_LKSTS_GRANTED); +} + static inline int is_remote(struct dlm_rsb *r) { DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r);); @@ -254,19 +268,55 @@ static inline int down_conversion(struct dlm_lkb *lkb) return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode); } +static inline int is_overlap_unlock(struct dlm_lkb *lkb) +{ + return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK; +} + +static inline int is_overlap_cancel(struct dlm_lkb *lkb) +{ + return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL; +} + +static inline int is_overlap(struct dlm_lkb *lkb) +{ + return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK | + DLM_IFL_OVERLAP_CANCEL)); +} + static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) { if (is_master_copy(lkb)) return; + del_timeout(lkb); + DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb);); + /* if the operation was a cancel, then return -DLM_ECANCEL, if a + timeout caused the cancel then return -ETIMEDOUT */ + if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) { + lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL; + rv = -ETIMEDOUT; + } + + if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) { + lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL; + rv = -EDEADLK; + } + lkb->lkb_lksb->sb_status = rv; lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags; dlm_add_ast(lkb, AST_COMP); } +static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb) +{ + queue_cast(r, lkb, + is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL); +} + static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) { if (is_master_copy(lkb)) @@ -547,6 +597,8 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) lkb->lkb_grmode = DLM_LOCK_IV; kref_init(&lkb->lkb_ref); INIT_LIST_HEAD(&lkb->lkb_ownqueue); + INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); + INIT_LIST_HEAD(&lkb->lkb_time_list); get_random_bytes(&bucket, sizeof(bucket)); bucket &= (ls->ls_lkbtbl_size - 1); @@ -556,7 +608,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) /* counter can roll over so we must verify lkid is not in use */ while (lkid == 0) { - lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16); + lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++; list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) { @@ -577,8 +629,8 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid) { - uint16_t bucket = lkid & 0xFFFF; struct dlm_lkb *lkb; + uint16_t bucket = (lkid >> 16); list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) { if (lkb->lkb_id == lkid) @@ -590,7 +642,7 @@ static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid) static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret) { struct dlm_lkb *lkb; - uint16_t bucket = lkid & 0xFFFF; + uint16_t bucket = (lkid >> 16); if (bucket >= ls->ls_lkbtbl_size) return -EBADSLT; @@ -620,7 +672,7 @@ static void kill_lkb(struct kref *kref) static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) { - uint16_t bucket = lkb->lkb_id & 0xFFFF; + uint16_t bucket = (lkb->lkb_id >> 16); write_lock(&ls->ls_lkbtbl[bucket].lock); if (kref_put(&lkb->lkb_ref, kill_lkb)) { @@ -735,23 +787,75 @@ static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts) unhold_lkb(lkb); } +static int msg_reply_type(int mstype) +{ + switch (mstype) { + case DLM_MSG_REQUEST: + return DLM_MSG_REQUEST_REPLY; + case DLM_MSG_CONVERT: + return DLM_MSG_CONVERT_REPLY; + case DLM_MSG_UNLOCK: + return DLM_MSG_UNLOCK_REPLY; + case DLM_MSG_CANCEL: + return DLM_MSG_CANCEL_REPLY; + case DLM_MSG_LOOKUP: + return DLM_MSG_LOOKUP_REPLY; + } + return -1; +} + /* add/remove lkb from global waiters list of lkb's waiting for a reply from a remote node */ -static void add_to_waiters(struct dlm_lkb *lkb, int mstype) +static int add_to_waiters(struct dlm_lkb *lkb, int mstype) { struct dlm_ls *ls = lkb->lkb_resource->res_ls; + int error = 0; mutex_lock(&ls->ls_waiters_mutex); - if (lkb->lkb_wait_type) { - log_print("add_to_waiters error %d", lkb->lkb_wait_type); + + if (is_overlap_unlock(lkb) || + (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) { + error = -EINVAL; + goto out; + } + + if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) { + switch (mstype) { + case DLM_MSG_UNLOCK: + lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; + break; + case DLM_MSG_CANCEL: + lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; + break; + default: + error = -EBUSY; + goto out; + } + lkb->lkb_wait_count++; + hold_lkb(lkb); + + log_debug(ls, "add overlap %x cur %d new %d count %d flags %x", + lkb->lkb_id, lkb->lkb_wait_type, mstype, + lkb->lkb_wait_count, lkb->lkb_flags); goto out; } + + DLM_ASSERT(!lkb->lkb_wait_count, + dlm_print_lkb(lkb); + printk("wait_count %d\n", lkb->lkb_wait_count);); + + lkb->lkb_wait_count++; lkb->lkb_wait_type = mstype; - kref_get(&lkb->lkb_ref); + hold_lkb(lkb); list_add(&lkb->lkb_wait_reply, &ls->ls_waiters); out: + if (error) + log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s", + lkb->lkb_id, error, lkb->lkb_flags, mstype, + lkb->lkb_wait_type, lkb->lkb_resource->res_name); mutex_unlock(&ls->ls_waiters_mutex); + return error; } /* We clear the RESEND flag because we might be taking an lkb off the waiters @@ -759,34 +863,85 @@ static void add_to_waiters(struct dlm_lkb *lkb, int mstype) request reply on the requestqueue) between dlm_recover_waiters_pre() which set RESEND and dlm_recover_waiters_post() */ -static int _remove_from_waiters(struct dlm_lkb *lkb) +static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype) { - int error = 0; + struct dlm_ls *ls = lkb->lkb_resource->res_ls; + int overlap_done = 0; - if (!lkb->lkb_wait_type) { - log_print("remove_from_waiters error"); - error = -EINVAL; - goto out; + if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) { + lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; + overlap_done = 1; + goto out_del; + } + + if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) { + lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; + overlap_done = 1; + goto out_del; + } + + /* N.B. type of reply may not always correspond to type of original + msg due to lookup->request optimization, verify others? */ + + if (lkb->lkb_wait_type) { + lkb->lkb_wait_type = 0; + goto out_del; + } + + log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d", + lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type); + return -1; + + out_del: + /* the force-unlock/cancel has completed and we haven't recvd a reply + to the op that was in progress prior to the unlock/cancel; we + give up on any reply to the earlier op. FIXME: not sure when/how + this would happen */ + + if (overlap_done && lkb->lkb_wait_type) { + log_error(ls, "remove_from_waiters %x reply %d give up on %d", + lkb->lkb_id, mstype, lkb->lkb_wait_type); + lkb->lkb_wait_count--; + lkb->lkb_wait_type = 0; } - lkb->lkb_wait_type = 0; + + DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb);); + lkb->lkb_flags &= ~DLM_IFL_RESEND; - list_del(&lkb->lkb_wait_reply); + lkb->lkb_wait_count--; + if (!lkb->lkb_wait_count) + list_del_init(&lkb->lkb_wait_reply); unhold_lkb(lkb); - out: - return error; + return 0; } -static int remove_from_waiters(struct dlm_lkb *lkb) +static int remove_from_waiters(struct dlm_lkb *lkb, int mstype) { struct dlm_ls *ls = lkb->lkb_resource->res_ls; int error; mutex_lock(&ls->ls_waiters_mutex); - error = _remove_from_waiters(lkb); + error = _remove_from_waiters(lkb, mstype); mutex_unlock(&ls->ls_waiters_mutex); return error; } +/* Handles situations where we might be processing a "fake" or "stub" reply in + which we can't try to take waiters_mutex again. */ + +static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms) +{ + struct dlm_ls *ls = lkb->lkb_resource->res_ls; + int error; + + if (ms != &ls->ls_stub_ms) + mutex_lock(&ls->ls_waiters_mutex); + error = _remove_from_waiters(lkb, ms->m_type); + if (ms != &ls->ls_stub_ms) + mutex_unlock(&ls->ls_waiters_mutex); + return error; +} + static void dir_remove(struct dlm_rsb *r) { int to_nodeid; @@ -848,15 +1003,136 @@ void dlm_scan_rsbs(struct dlm_ls *ls) { int i; - if (dlm_locking_stopped(ls)) - return; - for (i = 0; i < ls->ls_rsbtbl_size; i++) { shrink_bucket(ls, i); + if (dlm_locking_stopped(ls)) + break; cond_resched(); } } +static void add_timeout(struct dlm_lkb *lkb) +{ + struct dlm_ls *ls = lkb->lkb_resource->res_ls; + + if (is_master_copy(lkb)) { + lkb->lkb_timestamp = jiffies; + return; + } + + if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) && + !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) { + lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN; + goto add_it; + } + if (lkb->lkb_exflags & DLM_LKF_TIMEOUT) + goto add_it; + return; + + add_it: + DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb);); + mutex_lock(&ls->ls_timeout_mutex); + hold_lkb(lkb); + lkb->lkb_timestamp = jiffies; + list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout); + mutex_unlock(&ls->ls_timeout_mutex); +} + +static void del_timeout(struct dlm_lkb *lkb) +{ + struct dlm_ls *ls = lkb->lkb_resource->res_ls; + + mutex_lock(&ls->ls_timeout_mutex); + if (!list_empty(&lkb->lkb_time_list)) { + list_del_init(&lkb->lkb_time_list); + unhold_lkb(lkb); + } + mutex_unlock(&ls->ls_timeout_mutex); +} + +/* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and + lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex + and then lock rsb because of lock ordering in add_timeout. We may need + to specify some special timeout-related bits in the lkb that are just to + be accessed under the timeout_mutex. */ + +void dlm_scan_timeout(struct dlm_ls *ls) +{ + struct dlm_rsb *r; + struct dlm_lkb *lkb; + int do_cancel, do_warn; + + for (;;) { + if (dlm_locking_stopped(ls)) + break; + + do_cancel = 0; + do_warn = 0; + mutex_lock(&ls->ls_timeout_mutex); + list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) { + + if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) && + time_after_eq(jiffies, lkb->lkb_timestamp + + lkb->lkb_timeout_cs * HZ/100)) + do_cancel = 1; + + if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) && + time_after_eq(jiffies, lkb->lkb_timestamp + + dlm_config.ci_timewarn_cs * HZ/100)) + do_warn = 1; + + if (!do_cancel && !do_warn) + continue; + hold_lkb(lkb); + break; + } + mutex_unlock(&ls->ls_timeout_mutex); + + if (!do_cancel && !do_warn) + break; + + r = lkb->lkb_resource; + hold_rsb(r); + lock_rsb(r); + + if (do_warn) { + /* clear flag so we only warn once */ + lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN; + if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT)) + del_timeout(lkb); + dlm_timeout_warn(lkb); + } + + if (do_cancel) { + log_debug(ls, "timeout cancel %x node %d %s", + lkb->lkb_id, lkb->lkb_nodeid, r->res_name); + lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN; + lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL; + del_timeout(lkb); + _cancel_lock(r, lkb); + } + + unlock_rsb(r); + unhold_rsb(r); + dlm_put_lkb(lkb); + } +} + +/* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping + dlm_recoverd before checking/setting ls_recover_begin. */ + +void dlm_adjust_timeouts(struct dlm_ls *ls) +{ + struct dlm_lkb *lkb; + long adj = jiffies - ls->ls_recover_begin; + + ls->ls_recover_begin = 0; + mutex_lock(&ls->ls_timeout_mutex); + list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) + lkb->lkb_timestamp += adj; + mutex_unlock(&ls->ls_timeout_mutex); +} + /* lkb is master or local copy */ static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) @@ -988,8 +1264,14 @@ static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) _remove_lock(r, lkb); } -static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) +/* returns: 0 did nothing + 1 moved lock to granted + -1 removed lock */ + +static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) { + int rv = 0; + lkb->lkb_rqmode = DLM_LOCK_IV; switch (lkb->lkb_status) { @@ -997,6 +1279,7 @@ static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) break; case DLM_LKSTS_CONVERT: move_lkb(r, lkb, DLM_LKSTS_GRANTED); + rv = 1; break; case DLM_LKSTS_WAITING: del_lkb(r, lkb); @@ -1004,15 +1287,17 @@ static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) /* this unhold undoes the original ref from create_lkb() so this leads to the lkb being freed */ unhold_lkb(lkb); + rv = -1; break; default: log_print("invalid status for revert %d", lkb->lkb_status); } + return rv; } -static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) +static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) { - revert_lock(r, lkb); + return revert_lock(r, lkb); } static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) @@ -1055,6 +1340,50 @@ static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb) queue_cast(r, lkb, 0); } +/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to + change the granted/requested modes. We're munging things accordingly in + the process copy. + CONVDEADLK: our grmode may have been forced down to NL to resolve a + conversion deadlock + ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become + compatible with other granted locks */ + +static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms) +{ + if (ms->m_type != DLM_MSG_CONVERT_REPLY) { + log_print("munge_demoted %x invalid reply type %d", + lkb->lkb_id, ms->m_type); + return; + } + + if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) { + log_print("munge_demoted %x invalid modes gr %d rq %d", + lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode); + return; + } + + lkb->lkb_grmode = DLM_LOCK_NL; +} + +static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms) +{ + if (ms->m_type != DLM_MSG_REQUEST_REPLY && + ms->m_type != DLM_MSG_GRANT) { + log_print("munge_altmode %x invalid reply type %d", + lkb->lkb_id, ms->m_type); + return; + } + + if (lkb->lkb_exflags & DLM_LKF_ALTPR) + lkb->lkb_rqmode = DLM_LOCK_PR; + else if (lkb->lkb_exflags & DLM_LKF_ALTCW) + lkb->lkb_rqmode = DLM_LOCK_CW; + else { + log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags); + dlm_print_lkb(lkb); + } +} + static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head) { struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb, @@ -1085,10 +1414,8 @@ static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb) * queue for one resource. The granted mode of each lock blocks the requested * mode of the other lock." * - * Part 2: if the granted mode of lkb is preventing the first lkb in the - * convert queue from being granted, then demote lkb (set grmode to NL). - * This second form requires that we check for conv-deadlk even when - * now == 0 in _can_be_granted(). + * Part 2: if the granted mode of lkb is preventing an earlier lkb in the + * convert queue from being granted, then deadlk/demote lkb. * * Example: * Granted Queue: empty @@ -1097,41 +1424,52 @@ static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb) * * The first lock can't be granted because of the granted mode of the second * lock and the second lock can't be granted because it's not first in the - * list. We demote the granted mode of the second lock (the lkb passed to this - * function). + * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we + * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK + * flag set and return DEMOTED in the lksb flags. * - * After the resolution, the "grant pending" function needs to go back and try - * to grant locks on the convert queue again since the first lock can now be - * granted. + * Originally, this function detected conv-deadlk in a more limited scope: + * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or + * - if lkb1 was the first entry in the queue (not just earlier), and was + * blocked by the granted mode of lkb2, and there was nothing on the + * granted queue preventing lkb1 from being granted immediately, i.e. + * lkb2 was the only thing preventing lkb1 from being granted. + * + * That second condition meant we'd only say there was conv-deadlk if + * resolving it (by demotion) would lead to the first lock on the convert + * queue being granted right away. It allowed conversion deadlocks to exist + * between locks on the convert queue while they couldn't be granted anyway. + * + * Now, we detect and take action on conversion deadlocks immediately when + * they're created, even if they may not be immediately consequential. If + * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted + * mode that would prevent lkb1's conversion from being granted, we do a + * deadlk/demote on lkb2 right away and don't let it onto the convert queue. + * I think this means that the lkb_is_ahead condition below should always + * be zero, i.e. there will never be conv-deadlk between two locks that are + * both already on the convert queue. */ -static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb) +static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2) { - struct dlm_lkb *this, *first = NULL, *self = NULL; + struct dlm_lkb *lkb1; + int lkb_is_ahead = 0; - list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) { - if (!first) - first = this; - if (this == lkb) { - self = lkb; + list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) { + if (lkb1 == lkb2) { + lkb_is_ahead = 1; continue; } - if (!modes_compat(this, lkb) && !modes_compat(lkb, this)) - return 1; - } - - /* if lkb is on the convert queue and is preventing the first - from being granted, then there's deadlock and we demote lkb. - multiple converting locks may need to do this before the first - converting lock can be granted. */ - - if (self && self != first) { - if (!modes_compat(lkb, first) && - !queue_conflict(&rsb->res_grantqueue, first)) - return 1; + if (!lkb_is_ahead) { + if (!modes_compat(lkb2, lkb1)) + return 1; + } else { + if (!modes_compat(lkb2, lkb1) && + !modes_compat(lkb1, lkb2)) + return 1; + } } - return 0; } @@ -1260,42 +1598,57 @@ static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now) if (!now && !conv && list_empty(&r->res_convertqueue) && first_in_list(lkb, &r->res_waitqueue)) return 1; - out: - /* - * The following, enabled by CONVDEADLK, departs from VMS. - */ - - if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) && - conversion_deadlock_detect(r, lkb)) { - lkb->lkb_grmode = DLM_LOCK_NL; - lkb->lkb_sbflags |= DLM_SBF_DEMOTED; - } - return 0; } -/* - * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a - * simple way to provide a big optimization to applications that can use them. - */ - -static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now) +static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, + int *err) { - uint32_t flags = lkb->lkb_exflags; int rv; int8_t alt = 0, rqmode = lkb->lkb_rqmode; + int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV); + + if (err) + *err = 0; rv = _can_be_granted(r, lkb, now); if (rv) goto out; - if (lkb->lkb_sbflags & DLM_SBF_DEMOTED) + /* + * The CONVDEADLK flag is non-standard and tells the dlm to resolve + * conversion deadlocks by demoting grmode to NL, otherwise the dlm + * cancels one of the locks. + */ + + if (is_convert && can_be_queued(lkb) && + conversion_deadlock_detect(r, lkb)) { + if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) { + lkb->lkb_grmode = DLM_LOCK_NL; + lkb->lkb_sbflags |= DLM_SBF_DEMOTED; + } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) { + if (err) + *err = -EDEADLK; + else { + log_print("can_be_granted deadlock %x now %d", + lkb->lkb_id, now); + dlm_dump_rsb(r); + } + } goto out; + } + + /* + * The ALTPR and ALTCW flags are non-standard and tell the dlm to try + * to grant a request in a mode other than the normal rqmode. It's a + * simple way to provide a big optimization to applications that can + * use them. + */ - if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR) + if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR)) alt = DLM_LOCK_PR; - else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW) + else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW)) alt = DLM_LOCK_CW; if (alt) { @@ -1310,10 +1663,21 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now) return rv; } -static int grant_pending_convert(struct dlm_rsb *r, int high) +/* FIXME: I don't think that can_be_granted() can/will demote or find deadlock + for locks pending on the convert list. Once verified (watch for these + log_prints), we should be able to just call _can_be_granted() and not + bother with the demote/deadlk cases here (and there's no easy way to deal + with a deadlk here, we'd have to generate something like grant_lock with + the deadlk error.) */ + +/* Returns the highest requested mode of all blocked conversions; sets + cw if there's a blocked conversion to DLM_LOCK_CW. */ + +static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw) { struct dlm_lkb *lkb, *s; int hi, demoted, quit, grant_restart, demote_restart; + int deadlk; quit = 0; restart: @@ -1323,14 +1687,32 @@ static int grant_pending_convert(struct dlm_rsb *r, int high) list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) { demoted = is_demoted(lkb); - if (can_be_granted(r, lkb, 0)) { + deadlk = 0; + + if (can_be_granted(r, lkb, 0, &deadlk)) { grant_lock_pending(r, lkb); grant_restart = 1; - } else { - hi = max_t(int, lkb->lkb_rqmode, hi); - if (!demoted && is_demoted(lkb)) - demote_restart = 1; + continue; + } + + if (!demoted && is_demoted(lkb)) { + log_print("WARN: pending demoted %x node %d %s", + lkb->lkb_id, lkb->lkb_nodeid, r->res_name); + demote_restart = 1; + continue; } + + if (deadlk) { + log_print("WARN: pending deadlock %x node %d %s", + lkb->lkb_id, lkb->lkb_nodeid, r->res_name); + dlm_dump_rsb(r); + continue; + } + + hi = max_t(int, lkb->lkb_rqmode, hi); + + if (cw && lkb->lkb_rqmode == DLM_LOCK_CW) + *cw = 1; } if (grant_restart) @@ -1343,29 +1725,52 @@ static int grant_pending_convert(struct dlm_rsb *r, int high) return max_t(int, high, hi); } -static int grant_pending_wait(struct dlm_rsb *r, int high) +static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw) { struct dlm_lkb *lkb, *s; list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) { - if (can_be_granted(r, lkb, 0)) + if (can_be_granted(r, lkb, 0, NULL)) grant_lock_pending(r, lkb); - else + else { high = max_t(int, lkb->lkb_rqmode, high); + if (lkb->lkb_rqmode == DLM_LOCK_CW) + *cw = 1; + } } return high; } +/* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked + on either the convert or waiting queue. + high is the largest rqmode of all locks blocked on the convert or + waiting queue. */ + +static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw) +{ + if (gr->lkb_grmode == DLM_LOCK_PR && cw) { + if (gr->lkb_highbast < DLM_LOCK_EX) + return 1; + return 0; + } + + if (gr->lkb_highbast < high && + !__dlm_compat_matrix[gr->lkb_grmode+1][high+1]) + return 1; + return 0; +} + static void grant_pending_locks(struct dlm_rsb *r) { struct dlm_lkb *lkb, *s; int high = DLM_LOCK_IV; + int cw = 0; DLM_ASSERT(is_master(r), dlm_dump_rsb(r);); - high = grant_pending_convert(r, high); - high = grant_pending_wait(r, high); + high = grant_pending_convert(r, high, &cw); + high = grant_pending_wait(r, high, &cw); if (high == DLM_LOCK_IV) return; @@ -1373,27 +1778,41 @@ static void grant_pending_locks(struct dlm_rsb *r) /* * If there are locks left on the wait/convert queue then send blocking * ASTs to granted locks based on the largest requested mode (high) - * found above. FIXME: highbast < high comparison not valid for PR/CW. + * found above. */ list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) { - if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) && - !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) { - queue_bast(r, lkb, high); + if (lkb->lkb_bastaddr && lock_requires_bast(lkb, high, cw)) { + if (cw && high == DLM_LOCK_PR) + queue_bast(r, lkb, DLM_LOCK_CW); + else + queue_bast(r, lkb, high); lkb->lkb_highbast = high; } } } +static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq) +{ + if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) || + (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) { + if (gr->lkb_highbast < DLM_LOCK_EX) + return 1; + return 0; + } + + if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq)) + return 1; + return 0; +} + static void send_bast_queue(struct dlm_rsb *r, struct list_head *head, struct dlm_lkb *lkb) { struct dlm_lkb *gr; list_for_each_entry(gr, head, lkb_statequeue) { - if (gr->lkb_bastaddr && - gr->lkb_highbast < lkb->lkb_rqmode && - !modes_compat(gr, lkb)) { + if (gr->lkb_bastaddr && modes_require_bast(gr, lkb)) { queue_bast(r, gr, lkb->lkb_rqmode); gr->lkb_highbast = lkb->lkb_rqmode; } @@ -1499,7 +1918,7 @@ static void process_lookup_list(struct dlm_rsb *r) struct dlm_lkb *lkb, *safe; list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) { - list_del(&lkb->lkb_rsb_lookup); + list_del_init(&lkb->lkb_rsb_lookup); _request_lock(r, lkb); schedule(); } @@ -1530,7 +1949,7 @@ static void confirm_master(struct dlm_rsb *r, int error) if (!list_empty(&r->res_lookup)) { lkb = list_entry(r->res_lookup.next, struct dlm_lkb, lkb_rsb_lookup); - list_del(&lkb->lkb_rsb_lookup); + list_del_init(&lkb->lkb_rsb_lookup); r->res_first_lkid = lkb->lkb_id; _request_lock(r, lkb); } else @@ -1543,7 +1962,7 @@ static void confirm_master(struct dlm_rsb *r, int error) } static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags, - int namelen, uint32_t parent_lkid, void *ast, + int namelen, unsigned long timeout_cs, void *ast, void *astarg, void *bast, struct dlm_args *args) { int rv = -EINVAL; @@ -1586,10 +2005,6 @@ static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags, if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr) goto out; - /* parent/child locks not yet supported */ - if (parent_lkid) - goto out; - if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid) goto out; @@ -1601,6 +2016,7 @@ static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags, args->astaddr = ast; args->astparam = (long) astarg; args->bastaddr = bast; + args->timeout = timeout_cs; args->mode = mode; args->lksb = lksb; rv = 0; @@ -1614,6 +2030,9 @@ static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args) DLM_LKF_FORCEUNLOCK)) return -EINVAL; + if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK) + return -EINVAL; + args->flags = flags; args->astparam = (long) astarg; return 0; @@ -1638,6 +2057,9 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, if (lkb->lkb_wait_type) goto out; + + if (is_overlap(lkb)) + goto out; } lkb->lkb_exflags = args->flags; @@ -1649,40 +2071,138 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, lkb->lkb_lksb = args->lksb; lkb->lkb_lvbptr = args->lksb->sb_lvbptr; lkb->lkb_ownpid = (int) current->pid; + lkb->lkb_timeout_cs = args->timeout; rv = 0; out: return rv; } +/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0 + for success */ + +/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here + because there may be a lookup in progress and it's valid to do + cancel/unlockf on it */ + static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) { + struct dlm_ls *ls = lkb->lkb_resource->res_ls; int rv = -EINVAL; - if (lkb->lkb_flags & DLM_IFL_MSTCPY) + if (lkb->lkb_flags & DLM_IFL_MSTCPY) { + log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id); + dlm_print_lkb(lkb); goto out; + } - if (args->flags & DLM_LKF_FORCEUNLOCK) - goto out_ok; + /* an lkb may still exist even though the lock is EOL'ed due to a + cancel, unlock or failed noqueue request; an app can't use these + locks; return same error as if the lkid had not been found at all */ - if (args->flags & DLM_LKF_CANCEL && - lkb->lkb_status == DLM_LKSTS_GRANTED) + if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) { + log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id); + rv = -ENOENT; goto out; + } - if (!(args->flags & DLM_LKF_CANCEL) && - lkb->lkb_status != DLM_LKSTS_GRANTED) - goto out; + /* an lkb may be waiting for an rsb lookup to complete where the + lookup was initiated by another lock */ + + if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) { + if (!list_empty(&lkb->lkb_rsb_lookup)) { + log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id); + list_del_init(&lkb->lkb_rsb_lookup); + queue_cast(lkb->lkb_resource, lkb, + args->flags & DLM_LKF_CANCEL ? + -DLM_ECANCEL : -DLM_EUNLOCK); + unhold_lkb(lkb); /* undoes create_lkb() */ + rv = -EBUSY; + goto out; + } + } - rv = -EBUSY; - if (lkb->lkb_wait_type) - goto out; + /* cancel not allowed with another cancel/unlock in progress */ - out_ok: - lkb->lkb_exflags = args->flags; - lkb->lkb_sbflags = 0; - lkb->lkb_astparam = args->astparam; + if (args->flags & DLM_LKF_CANCEL) { + if (lkb->lkb_exflags & DLM_LKF_CANCEL) + goto out; - rv = 0; - out: + if (is_overlap(lkb)) + goto out; + + /* don't let scand try to do a cancel */ + del_timeout(lkb); + + if (lkb->lkb_flags & DLM_IFL_RESEND) { + lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; + rv = -EBUSY; + goto out; + } + + switch (lkb->lkb_wait_type) { + case DLM_MSG_LOOKUP: + case DLM_MSG_REQUEST: + lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; + rv = -EBUSY; + goto out; + case DLM_MSG_UNLOCK: + case DLM_MSG_CANCEL: + goto out; + } + /* add_to_waiters() will set OVERLAP_CANCEL */ + goto out_ok; + } + + /* do we need to allow a force-unlock if there's a normal unlock + already in progress? in what conditions could the normal unlock + fail such that we'd want to send a force-unlock to be sure? */ + + if (args->flags & DLM_LKF_FORCEUNLOCK) { + if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK) + goto out; + + if (is_overlap_unlock(lkb)) + goto out; + + /* don't let scand try to do a cancel */ + del_timeout(lkb); + + if (lkb->lkb_flags & DLM_IFL_RESEND) { + lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; + rv = -EBUSY; + goto out; + } + + switch (lkb->lkb_wait_type) { + case DLM_MSG_LOOKUP: + case DLM_MSG_REQUEST: + lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; + rv = -EBUSY; + goto out; + case DLM_MSG_UNLOCK: + goto out; + } + /* add_to_waiters() will set OVERLAP_UNLOCK */ + goto out_ok; + } + + /* normal unlock not allowed if there's any op in progress */ + rv = -EBUSY; + if (lkb->lkb_wait_type || lkb->lkb_wait_count) + goto out; + + out_ok: + /* an overlapping op shouldn't blow away exflags from other op */ + lkb->lkb_exflags |= args->flags; + lkb->lkb_sbflags = 0; + lkb->lkb_astparam = args->astparam; + rv = 0; + out: + if (rv) + log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv, + lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags, + args->flags, lkb->lkb_wait_type, + lkb->lkb_resource->res_name); return rv; } @@ -1697,7 +2217,7 @@ static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb) { int error = 0; - if (can_be_granted(r, lkb, 1)) { + if (can_be_granted(r, lkb, 1, NULL)) { grant_lock(r, lkb); queue_cast(r, lkb, 0); goto out; @@ -1707,6 +2227,7 @@ static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb) error = -EINPROGRESS; add_lkb(r, lkb, DLM_LKSTS_WAITING); send_blocking_asts(r, lkb); + add_timeout(lkb); goto out; } @@ -1722,23 +2243,55 @@ static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb) static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) { int error = 0; + int deadlk = 0; /* changing an existing lock may allow others to be granted */ - if (can_be_granted(r, lkb, 1)) { + if (can_be_granted(r, lkb, 1, &deadlk)) { grant_lock(r, lkb); queue_cast(r, lkb, 0); grant_pending_locks(r); goto out; } - if (can_be_queued(lkb)) { - if (is_demoted(lkb)) + /* can_be_granted() detected that this lock would block in a conversion + deadlock, so we leave it on the granted queue and return EDEADLK in + the ast for the convert. */ + + if (deadlk) { + /* it's left on the granted queue */ + log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s", + lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status, + lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name); + revert_lock(r, lkb); + queue_cast(r, lkb, -EDEADLK); + error = -EDEADLK; + goto out; + } + + /* is_demoted() means the can_be_granted() above set the grmode + to NL, and left us on the granted queue. This auto-demotion + (due to CONVDEADLK) might mean other locks, and/or this lock, are + now grantable. We have to try to grant other converting locks + before we try again to grant this one. */ + + if (is_demoted(lkb)) { + grant_pending_convert(r, DLM_LOCK_IV, NULL); + if (_can_be_granted(r, lkb, 1)) { + grant_lock(r, lkb); + queue_cast(r, lkb, 0); grant_pending_locks(r); + goto out; + } + /* else fall through and move to convert queue */ + } + + if (can_be_queued(lkb)) { error = -EINPROGRESS; del_lkb(r, lkb); add_lkb(r, lkb, DLM_LKSTS_CONVERT); send_blocking_asts(r, lkb); + add_timeout(lkb); goto out; } @@ -1759,17 +2312,19 @@ static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) return -DLM_EUNLOCK; } -/* FIXME: if revert_lock() finds that the lkb is granted, we should - skip the queue_cast(ECANCEL). It indicates that the request/convert - completed (and queued a normal ast) just before the cancel; we don't - want to clobber the sb_result for the normal ast with ECANCEL. */ +/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */ static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) { - revert_lock(r, lkb); - queue_cast(r, lkb, -DLM_ECANCEL); - grant_pending_locks(r); - return -DLM_ECANCEL; + int error; + + error = revert_lock(r, lkb); + if (error) { + queue_cast(r, lkb, -DLM_ECANCEL); + grant_pending_locks(r); + return -DLM_ECANCEL; + } + return 0; } /* @@ -1970,7 +2525,7 @@ int dlm_lock(dlm_lockspace_t *lockspace, if (!ls) return -EINVAL; - lock_recovery(ls); + dlm_lock_recovery(ls); if (convert) error = find_lkb(ls, lksb->sb_lkid, &lkb); @@ -1980,7 +2535,7 @@ int dlm_lock(dlm_lockspace_t *lockspace, if (error) goto out; - error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast, + error = set_lock_args(mode, lksb, flags, namelen, 0, ast, astarg, bast, &args); if (error) goto out_put; @@ -1995,10 +2550,10 @@ int dlm_lock(dlm_lockspace_t *lockspace, out_put: if (convert || error) __put_lkb(ls, lkb); - if (error == -EAGAIN) + if (error == -EAGAIN || error == -EDEADLK) error = 0; out: - unlock_recovery(ls); + dlm_unlock_recovery(ls); dlm_put_lockspace(ls); return error; } @@ -2018,7 +2573,7 @@ int dlm_unlock(dlm_lockspace_t *lockspace, if (!ls) return -EINVAL; - lock_recovery(ls); + dlm_lock_recovery(ls); error = find_lkb(ls, lkid, &lkb); if (error) @@ -2035,10 +2590,12 @@ int dlm_unlock(dlm_lockspace_t *lockspace, if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL) error = 0; + if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK))) + error = 0; out_put: dlm_put_lkb(lkb); out: - unlock_recovery(ls); + dlm_unlock_recovery(ls); dlm_put_lockspace(ls); return error; } @@ -2065,37 +2622,20 @@ int dlm_unlock(dlm_lockspace_t *lockspace, * receive_lookup_reply send_lookup_reply */ -static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, - int to_nodeid, int mstype, - struct dlm_message **ms_ret, - struct dlm_mhandle **mh_ret) +static int _create_message(struct dlm_ls *ls, int mb_len, + int to_nodeid, int mstype, + struct dlm_message **ms_ret, + struct dlm_mhandle **mh_ret) { struct dlm_message *ms; struct dlm_mhandle *mh; char *mb; - int mb_len = sizeof(struct dlm_message); - - switch (mstype) { - case DLM_MSG_REQUEST: - case DLM_MSG_LOOKUP: - case DLM_MSG_REMOVE: - mb_len += r->res_length; - break; - case DLM_MSG_CONVERT: - case DLM_MSG_UNLOCK: - case DLM_MSG_REQUEST_REPLY: - case DLM_MSG_CONVERT_REPLY: - case DLM_MSG_GRANT: - if (lkb && lkb->lkb_lvbptr) - mb_len += r->res_ls->ls_lvblen; - break; - } /* get_buffer gives us a message handle (mh) that we need to pass into lowcomms_commit and a message buffer (mb) that we write our data into */ - mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb); + mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, ls->ls_allocation, &mb); if (!mh) return -ENOBUFS; @@ -2104,7 +2644,7 @@ static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, ms = (struct dlm_message *) mb; ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR); - ms->m_header.h_lockspace = r->res_ls->ls_global_id; + ms->m_header.h_lockspace = ls->ls_global_id; ms->m_header.h_nodeid = dlm_our_nodeid(); ms->m_header.h_length = mb_len; ms->m_header.h_cmd = DLM_MSG; @@ -2116,6 +2656,33 @@ static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, return 0; } +static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, + int to_nodeid, int mstype, + struct dlm_message **ms_ret, + struct dlm_mhandle **mh_ret) +{ + int mb_len = sizeof(struct dlm_message); + + switch (mstype) { + case DLM_MSG_REQUEST: + case DLM_MSG_LOOKUP: + case DLM_MSG_REMOVE: + mb_len += r->res_length; + break; + case DLM_MSG_CONVERT: + case DLM_MSG_UNLOCK: + case DLM_MSG_REQUEST_REPLY: + case DLM_MSG_CONVERT_REPLY: + case DLM_MSG_GRANT: + if (lkb && lkb->lkb_lvbptr) + mb_len += r->res_ls->ls_lvblen; + break; + } + + return _create_message(r->res_ls, mb_len, to_nodeid, mstype, + ms_ret, mh_ret); +} + /* further lowcomms enhancements or alternate implementations may make the return value from this function useful at some point */ @@ -2176,7 +2743,9 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype) struct dlm_mhandle *mh; int to_nodeid, error; - add_to_waiters(lkb, mstype); + error = add_to_waiters(lkb, mstype); + if (error) + return error; to_nodeid = r->res_nodeid; @@ -2192,7 +2761,7 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype) return 0; fail: - remove_from_waiters(lkb); + remove_from_waiters(lkb, msg_reply_type(mstype)); return error; } @@ -2209,7 +2778,8 @@ static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) /* down conversions go without a reply from the master */ if (!error && down_conversion(lkb)) { - remove_from_waiters(lkb); + remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY); + r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; r->res_ls->ls_stub_ms.m_result = 0; r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags; __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms); @@ -2280,7 +2850,9 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb) struct dlm_mhandle *mh; int to_nodeid, error; - add_to_waiters(lkb, DLM_MSG_LOOKUP); + error = add_to_waiters(lkb, DLM_MSG_LOOKUP); + if (error) + return error; to_nodeid = dlm_dir_nodeid(r); @@ -2296,7 +2868,7 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb) return 0; fail: - remove_from_waiters(lkb); + remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY); return error; } @@ -2656,6 +3228,8 @@ static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms) lock_rsb(r); receive_flags_reply(lkb, ms); + if (is_altmode(lkb)) + munge_altmode(lkb, ms); grant_lock_pc(r, lkb, ms); queue_cast(r, lkb, 0); @@ -2736,11 +3310,16 @@ static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms) dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len); } +static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms) +{ + do_purge(ls, ms->m_nodeid, ms->m_pid); +} + static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; - int error, mstype; + int error, mstype, result; error = find_lkb(ls, ms->m_remid, &lkb); if (error) { @@ -2749,20 +3328,15 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) } DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); - mstype = lkb->lkb_wait_type; - error = remove_from_waiters(lkb); - if (error) { - log_error(ls, "receive_request_reply not on waiters"); - goto out; - } - - /* this is the value returned from do_request() on the master */ - error = ms->m_result; - r = lkb->lkb_resource; hold_rsb(r); lock_rsb(r); + mstype = lkb->lkb_wait_type; + error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); + if (error) + goto out; + /* Optimization: the dir node was also the master, so it took our lookup as a request and sent request reply instead of lookup reply */ if (mstype == DLM_MSG_LOOKUP) { @@ -2770,14 +3344,15 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) lkb->lkb_nodeid = r->res_nodeid; } - switch (error) { + /* this is the value returned from do_request() on the master */ + result = ms->m_result; + + switch (result) { case -EAGAIN: - /* request would block (be queued) on remote master; - the unhold undoes the original ref from create_lkb() - so it leads to the lkb being freed */ + /* request would block (be queued) on remote master */ queue_cast(r, lkb, -EAGAIN); confirm_master(r, -EAGAIN); - unhold_lkb(lkb); + unhold_lkb(lkb); /* undoes create_lkb() */ break; case -EINPROGRESS: @@ -2785,73 +3360,116 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) /* request was queued or granted on remote master */ receive_flags_reply(lkb, ms); lkb->lkb_remid = ms->m_lkid; - if (error) + if (is_altmode(lkb)) + munge_altmode(lkb, ms); + if (result) { add_lkb(r, lkb, DLM_LKSTS_WAITING); - else { + add_timeout(lkb); + } else { grant_lock_pc(r, lkb, ms); queue_cast(r, lkb, 0); } - confirm_master(r, error); + confirm_master(r, result); break; case -EBADR: case -ENOTBLK: /* find_rsb failed to find rsb or rsb wasn't master */ + log_debug(ls, "receive_request_reply %x %x master diff %d %d", + lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result); r->res_nodeid = -1; lkb->lkb_nodeid = -1; - _request_lock(r, lkb); + + if (is_overlap(lkb)) { + /* we'll ignore error in cancel/unlock reply */ + queue_cast_overlap(r, lkb); + unhold_lkb(lkb); /* undoes create_lkb() */ + } else + _request_lock(r, lkb); break; default: - log_error(ls, "receive_request_reply error %d", error); + log_error(ls, "receive_request_reply %x error %d", + lkb->lkb_id, result); } + if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) { + log_debug(ls, "receive_request_reply %x result %d unlock", + lkb->lkb_id, result); + lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; + lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; + send_unlock(r, lkb); + } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) { + log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id); + lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; + lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; + send_cancel(r, lkb); + } else { + lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; + lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; + } + out: unlock_rsb(r); put_rsb(r); - out: dlm_put_lkb(lkb); } static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, struct dlm_message *ms) { - int error = ms->m_result; - /* this is the value returned from do_convert() on the master */ - - switch (error) { + switch (ms->m_result) { case -EAGAIN: /* convert would block (be queued) on remote master */ queue_cast(r, lkb, -EAGAIN); break; + case -EDEADLK: + receive_flags_reply(lkb, ms); + revert_lock_pc(r, lkb); + queue_cast(r, lkb, -EDEADLK); + break; + case -EINPROGRESS: /* convert was queued on remote master */ + receive_flags_reply(lkb, ms); + if (is_demoted(lkb)) + munge_demoted(lkb, ms); del_lkb(r, lkb); add_lkb(r, lkb, DLM_LKSTS_CONVERT); + add_timeout(lkb); break; case 0: /* convert was granted on remote master */ receive_flags_reply(lkb, ms); + if (is_demoted(lkb)) + munge_demoted(lkb, ms); grant_lock_pc(r, lkb, ms); queue_cast(r, lkb, 0); break; default: - log_error(r->res_ls, "receive_convert_reply error %d", error); + log_error(r->res_ls, "receive_convert_reply %x error %d", + lkb->lkb_id, ms->m_result); } } static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms) { struct dlm_rsb *r = lkb->lkb_resource; + int error; hold_rsb(r); lock_rsb(r); - __receive_convert_reply(r, lkb, ms); + /* stub reply can happen with waiters_mutex held */ + error = remove_from_waiters_ms(lkb, ms); + if (error) + goto out; + __receive_convert_reply(r, lkb, ms); + out: unlock_rsb(r); put_rsb(r); } @@ -2868,37 +3486,38 @@ static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) } DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); - error = remove_from_waiters(lkb); - if (error) { - log_error(ls, "receive_convert_reply not on waiters"); - goto out; - } - _receive_convert_reply(lkb, ms); - out: dlm_put_lkb(lkb); } static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) { struct dlm_rsb *r = lkb->lkb_resource; - int error = ms->m_result; + int error; hold_rsb(r); lock_rsb(r); + /* stub reply can happen with waiters_mutex held */ + error = remove_from_waiters_ms(lkb, ms); + if (error) + goto out; + /* this is the value returned from do_unlock() on the master */ - switch (error) { + switch (ms->m_result) { case -DLM_EUNLOCK: receive_flags_reply(lkb, ms); remove_lock_pc(r, lkb); queue_cast(r, lkb, -DLM_EUNLOCK); break; + case -ENOENT: + break; default: - log_error(r->res_ls, "receive_unlock_reply error %d", error); + log_error(r->res_ls, "receive_unlock_reply %x error %d", + lkb->lkb_id, ms->m_result); } - + out: unlock_rsb(r); put_rsb(r); } @@ -2915,37 +3534,38 @@ static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) } DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); - error = remove_from_waiters(lkb); - if (error) { - log_error(ls, "receive_unlock_reply not on waiters"); - goto out; - } - _receive_unlock_reply(lkb, ms); - out: dlm_put_lkb(lkb); } static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) { struct dlm_rsb *r = lkb->lkb_resource; - int error = ms->m_result; + int error; hold_rsb(r); lock_rsb(r); + /* stub reply can happen with waiters_mutex held */ + error = remove_from_waiters_ms(lkb, ms); + if (error) + goto out; + /* this is the value returned from do_cancel() on the master */ - switch (error) { + switch (ms->m_result) { case -DLM_ECANCEL: receive_flags_reply(lkb, ms); revert_lock_pc(r, lkb); queue_cast(r, lkb, -DLM_ECANCEL); break; + case 0: + break; default: - log_error(r->res_ls, "receive_cancel_reply error %d", error); + log_error(r->res_ls, "receive_cancel_reply %x error %d", + lkb->lkb_id, ms->m_result); } - + out: unlock_rsb(r); put_rsb(r); } @@ -2962,14 +3582,7 @@ static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) } DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); - error = remove_from_waiters(lkb); - if (error) { - log_error(ls, "receive_cancel_reply not on waiters"); - goto out; - } - _receive_cancel_reply(lkb, ms); - out: dlm_put_lkb(lkb); } @@ -2985,20 +3598,17 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) return; } - error = remove_from_waiters(lkb); - if (error) { - log_error(ls, "receive_lookup_reply not on waiters"); - goto out; - } - - /* this is the value returned by dlm_dir_lookup on dir node + /* ms->m_result is the value returned by dlm_dir_lookup on dir node FIXME: will a non-zero error ever be returned? */ - error = ms->m_result; r = lkb->lkb_resource; hold_rsb(r); lock_rsb(r); + error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY); + if (error) + goto out; + ret_nodeid = ms->m_nodeid; if (ret_nodeid == dlm_our_nodeid()) { r->res_nodeid = 0; @@ -3009,14 +3619,22 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) r->res_nodeid = ret_nodeid; } + if (is_overlap(lkb)) { + log_debug(ls, "receive_lookup_reply %x unlock %x", + lkb->lkb_id, lkb->lkb_flags); + queue_cast_overlap(r, lkb); + unhold_lkb(lkb); /* undoes create_lkb() */ + goto out_list; + } + _request_lock(r, lkb); + out_list: if (!ret_nodeid) process_lookup_list(r); - + out: unlock_rsb(r); put_rsb(r); - out: dlm_put_lkb(lkb); } @@ -3064,7 +3682,7 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery) } } - if (lock_recovery_try(ls)) + if (dlm_lock_recovery_try(ls)) break; schedule(); } @@ -3133,11 +3751,17 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery) receive_lookup_reply(ls, ms); break; + /* other messages */ + + case DLM_MSG_PURGE: + receive_purge(ls, ms); + break; + default: log_error(ls, "unknown message type %d", ms->m_type); } - unlock_recovery(ls); + dlm_unlock_recovery(ls); out: dlm_put_lockspace(ls); dlm_astd_wake(); @@ -3153,9 +3777,9 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb) { if (middle_conversion(lkb)) { hold_lkb(lkb); + ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; ls->ls_stub_ms.m_result = -EINPROGRESS; ls->ls_stub_ms.m_flags = lkb->lkb_flags; - _remove_from_waiters(lkb); _receive_convert_reply(lkb, &ls->ls_stub_ms); /* Same special case as in receive_rcom_lock_args() */ @@ -3227,18 +3851,18 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) case DLM_MSG_UNLOCK: hold_lkb(lkb); + ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY; ls->ls_stub_ms.m_result = -DLM_EUNLOCK; ls->ls_stub_ms.m_flags = lkb->lkb_flags; - _remove_from_waiters(lkb); _receive_unlock_reply(lkb, &ls->ls_stub_ms); dlm_put_lkb(lkb); break; case DLM_MSG_CANCEL: hold_lkb(lkb); + ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY; ls->ls_stub_ms.m_result = -DLM_ECANCEL; ls->ls_stub_ms.m_flags = lkb->lkb_flags; - _remove_from_waiters(lkb); _receive_cancel_reply(lkb, &ls->ls_stub_ms); dlm_put_lkb(lkb); break; @@ -3252,37 +3876,47 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) mutex_unlock(&ls->ls_waiters_mutex); } -static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) +static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls) { struct dlm_lkb *lkb; - int rv = 0; + int found = 0; mutex_lock(&ls->ls_waiters_mutex); list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) { if (lkb->lkb_flags & DLM_IFL_RESEND) { - rv = lkb->lkb_wait_type; - _remove_from_waiters(lkb); - lkb->lkb_flags &= ~DLM_IFL_RESEND; + hold_lkb(lkb); + found = 1; break; } } mutex_unlock(&ls->ls_waiters_mutex); - if (!rv) + if (!found) lkb = NULL; - *lkb_ret = lkb; - return rv; + return lkb; } /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the master or dir-node for r. Processing the lkb may result in it being placed back on waiters. */ +/* We do this after normal locking has been enabled and any saved messages + (in requestqueue) have been processed. We should be confident that at + this point we won't get or process a reply to any of these waiting + operations. But, new ops may be coming in on the rsbs/locks here from + userspace or remotely. */ + +/* there may have been an overlap unlock/cancel prior to recovery or after + recovery. if before, the lkb may still have a pos wait_count; if after, the + overlap flag would just have been set and nothing new sent. we can be + confident here than any replies to either the initial op or overlap ops + prior to recovery have been received. */ + int dlm_recover_waiters_post(struct dlm_ls *ls) { struct dlm_lkb *lkb; struct dlm_rsb *r; - int error = 0, mstype; + int error = 0, mstype, err, oc, ou; while (1) { if (dlm_locking_stopped(ls)) { @@ -3291,48 +3925,78 @@ int dlm_recover_waiters_post(struct dlm_ls *ls) break; } - mstype = remove_resend_waiter(ls, &lkb); - if (!mstype) + lkb = find_resend_waiter(ls); + if (!lkb) break; r = lkb->lkb_resource; + hold_rsb(r); + lock_rsb(r); + + mstype = lkb->lkb_wait_type; + oc = is_overlap_cancel(lkb); + ou = is_overlap_unlock(lkb); + err = 0; log_debug(ls, "recover_waiters_post %x type %d flags %x %s", lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name); - switch (mstype) { - - case DLM_MSG_LOOKUP: - hold_rsb(r); - lock_rsb(r); - _request_lock(r, lkb); - if (is_master(r)) - confirm_master(r, 0); - unlock_rsb(r); - put_rsb(r); - break; - - case DLM_MSG_REQUEST: - hold_rsb(r); - lock_rsb(r); - _request_lock(r, lkb); - if (is_master(r)) - confirm_master(r, 0); - unlock_rsb(r); - put_rsb(r); - break; - - case DLM_MSG_CONVERT: - hold_rsb(r); - lock_rsb(r); - _convert_lock(r, lkb); - unlock_rsb(r); - put_rsb(r); - break; - - default: - log_error(ls, "recover_waiters_post type %d", mstype); + /* At this point we assume that we won't get a reply to any + previous op or overlap op on this lock. First, do a big + remove_from_waiters() for all previous ops. */ + + lkb->lkb_flags &= ~DLM_IFL_RESEND; + lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; + lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; + lkb->lkb_wait_type = 0; + lkb->lkb_wait_count = 0; + mutex_lock(&ls->ls_waiters_mutex); + list_del_init(&lkb->lkb_wait_reply); + mutex_unlock(&ls->ls_waiters_mutex); + unhold_lkb(lkb); /* for waiters list */ + + if (oc || ou) { + /* do an unlock or cancel instead of resending */ + switch (mstype) { + case DLM_MSG_LOOKUP: + case DLM_MSG_REQUEST: + queue_cast(r, lkb, ou ? -DLM_EUNLOCK : + -DLM_ECANCEL); + unhold_lkb(lkb); /* undoes create_lkb() */ + break; + case DLM_MSG_CONVERT: + if (oc) { + queue_cast(r, lkb, -DLM_ECANCEL); + } else { + lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK; + _unlock_lock(r, lkb); + } + break; + default: + err = 1; + } + } else { + switch (mstype) { + case DLM_MSG_LOOKUP: + case DLM_MSG_REQUEST: + _request_lock(r, lkb); + if (is_master(r)) + confirm_master(r, 0); + break; + case DLM_MSG_CONVERT: + _convert_lock(r, lkb); + break; + default: + err = 1; + } } + + if (err) + log_error(ls, "recover_waiters_post %x %d %x %d %d", + lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou); + unlock_rsb(r); + put_rsb(r); + dlm_put_lkb(lkb); } return error; @@ -3628,13 +4292,13 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, int mode, uint32_t flags, void *name, unsigned int namelen, - uint32_t parent_lkid) + unsigned long timeout_cs) { struct dlm_lkb *lkb; struct dlm_args args; int error; - lock_recovery(ls); + dlm_lock_recovery(ls); error = create_lkb(ls, &lkb); if (error) { @@ -3656,7 +4320,7 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, When DLM_IFL_USER is set, the dlm knows that this is a userspace lock and that lkb_astparam is the dlm_user_args structure. */ - error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid, + error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs, DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args); lkb->lkb_flags |= DLM_IFL_USER; ua->old_mode = DLM_LOCK_IV; @@ -3684,23 +4348,24 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, /* add this new lkb to the per-process list of locks */ spin_lock(&ua->proc->locks_spin); - kref_get(&lkb->lkb_ref); + hold_lkb(lkb); list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); spin_unlock(&ua->proc->locks_spin); out: - unlock_recovery(ls); + dlm_unlock_recovery(ls); return error; } int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, - int mode, uint32_t flags, uint32_t lkid, char *lvb_in) + int mode, uint32_t flags, uint32_t lkid, char *lvb_in, + unsigned long timeout_cs) { struct dlm_lkb *lkb; struct dlm_args args; struct dlm_user_args *ua; int error; - lock_recovery(ls); + dlm_lock_recovery(ls); error = find_lkb(ls, lkid, &lkb); if (error) @@ -3721,6 +4386,7 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, if (lvb_in && ua->lksb.sb_lvbptr) memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN); + ua->xid = ua_tmp->xid; ua->castparam = ua_tmp->castparam; ua->castaddr = ua_tmp->castaddr; ua->bastparam = ua_tmp->bastparam; @@ -3728,19 +4394,19 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, ua->user_lksb = ua_tmp->user_lksb; ua->old_mode = lkb->lkb_grmode; - error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST, - ua, DLM_FAKE_USER_AST, &args); + error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs, + DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args); if (error) goto out_put; error = convert_lock(ls, lkb, &args); - if (error == -EINPROGRESS || error == -EAGAIN) + if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK) error = 0; out_put: dlm_put_lkb(lkb); out: - unlock_recovery(ls); + dlm_unlock_recovery(ls); kfree(ua_tmp); return error; } @@ -3753,7 +4419,7 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, struct dlm_user_args *ua; int error; - lock_recovery(ls); + dlm_lock_recovery(ls); error = find_lkb(ls, lkid, &lkb); if (error) @@ -3774,6 +4440,9 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, if (error == -DLM_EUNLOCK) error = 0; + /* from validate_unlock_args() */ + if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK)) + error = 0; if (error) goto out_put; @@ -3785,7 +4454,8 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, out_put: dlm_put_lkb(lkb); out: - unlock_recovery(ls); + dlm_unlock_recovery(ls); + kfree(ua_tmp); return error; } @@ -3797,7 +4467,7 @@ int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, struct dlm_user_args *ua; int error; - lock_recovery(ls); + dlm_lock_recovery(ls); error = find_lkb(ls, lkid, &lkb); if (error) @@ -3815,33 +4485,85 @@ int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, if (error == -DLM_ECANCEL) error = 0; + /* from validate_unlock_args() */ + if (error == -EBUSY) + error = 0; + out_put: + dlm_put_lkb(lkb); + out: + dlm_unlock_recovery(ls); + kfree(ua_tmp); + return error; +} + +int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid) +{ + struct dlm_lkb *lkb; + struct dlm_args args; + struct dlm_user_args *ua; + struct dlm_rsb *r; + int error; + + dlm_lock_recovery(ls); + + error = find_lkb(ls, lkid, &lkb); + if (error) + goto out; + + ua = (struct dlm_user_args *)lkb->lkb_astparam; + + error = set_unlock_args(flags, ua, &args); if (error) goto out_put; - /* this lkb was removed from the WAITING queue */ - if (lkb->lkb_grmode == DLM_LOCK_IV) { - spin_lock(&ua->proc->locks_spin); - list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking); - spin_unlock(&ua->proc->locks_spin); - } + /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */ + + r = lkb->lkb_resource; + hold_rsb(r); + lock_rsb(r); + + error = validate_unlock_args(lkb, &args); + if (error) + goto out_r; + lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL; + + error = _cancel_lock(r, lkb); + out_r: + unlock_rsb(r); + put_rsb(r); + + if (error == -DLM_ECANCEL) + error = 0; + /* from validate_unlock_args() */ + if (error == -EBUSY) + error = 0; out_put: dlm_put_lkb(lkb); out: - unlock_recovery(ls); + dlm_unlock_recovery(ls); return error; } +/* lkb's that are removed from the waiters list by revert are just left on the + orphans list with the granted orphan locks, to be freed by purge */ + static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) { struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam; + struct dlm_args args; + int error; - if (ua->lksb.sb_lvbptr) - kfree(ua->lksb.sb_lvbptr); - kfree(ua); - lkb->lkb_astparam = (long)NULL; + hold_lkb(lkb); + mutex_lock(&ls->ls_orphans_mutex); + list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans); + mutex_unlock(&ls->ls_orphans_mutex); - /* TODO: propogate to master if needed */ - return 0; + set_unlock_args(0, ua, &args); + + error = cancel_lock(ls, lkb, &args); + if (error == -DLM_ECANCEL) + error = 0; + return error; } /* The force flag allows the unlock to go ahead even if the lkb isn't granted. @@ -3853,10 +4575,6 @@ static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) struct dlm_args args; int error; - /* FIXME: we need to handle the case where the lkb is in limbo - while the rsb is being looked up, currently we assert in - _unlock_lock/is_remote because rsb nodeid is -1. */ - set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args); error = unlock_lock(ls, lkb, &args); @@ -3865,6 +4583,31 @@ static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) return error; } +/* We have to release clear_proc_locks mutex before calling unlock_proc_lock() + (which does lock_rsb) due to deadlock with receiving a message that does + lock_rsb followed by dlm_user_add_ast() */ + +static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls, + struct dlm_user_proc *proc) +{ + struct dlm_lkb *lkb = NULL; + + mutex_lock(&ls->ls_clear_proc_locks); + if (list_empty(&proc->locks)) + goto out; + + lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue); + list_del_init(&lkb->lkb_ownqueue); + + if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) + lkb->lkb_flags |= DLM_IFL_ORPHAN; + else + lkb->lkb_flags |= DLM_IFL_DEAD; + out: + mutex_unlock(&ls->ls_clear_proc_locks); + return lkb; +} + /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts, which we clear here. */ @@ -3879,19 +4622,17 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) { struct dlm_lkb *lkb, *safe; - lock_recovery(ls); - mutex_lock(&ls->ls_clear_proc_locks); - - list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) { - list_del_init(&lkb->lkb_ownqueue); + dlm_lock_recovery(ls); - if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) { - lkb->lkb_flags |= DLM_IFL_ORPHAN; + while (1) { + lkb = del_proc_lock(ls, proc); + if (!lkb) + break; + del_timeout(lkb); + if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) orphan_proc_lock(ls, lkb); - } else { - lkb->lkb_flags |= DLM_IFL_DEAD; + else unlock_proc_lock(ls, lkb); - } /* this removes the reference for the proc->locks list added by dlm_user_request, it may result in the lkb @@ -3900,6 +4641,8 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) dlm_put_lkb(lkb); } + mutex_lock(&ls->ls_clear_proc_locks); + /* in-progress unlocks */ list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { list_del_init(&lkb->lkb_ownqueue); @@ -3913,6 +4656,95 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) } mutex_unlock(&ls->ls_clear_proc_locks); - unlock_recovery(ls); + dlm_unlock_recovery(ls); +} + +static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) +{ + struct dlm_lkb *lkb, *safe; + + while (1) { + lkb = NULL; + spin_lock(&proc->locks_spin); + if (!list_empty(&proc->locks)) { + lkb = list_entry(proc->locks.next, struct dlm_lkb, + lkb_ownqueue); + list_del_init(&lkb->lkb_ownqueue); + } + spin_unlock(&proc->locks_spin); + + if (!lkb) + break; + + lkb->lkb_flags |= DLM_IFL_DEAD; + unlock_proc_lock(ls, lkb); + dlm_put_lkb(lkb); /* ref from proc->locks list */ + } + + spin_lock(&proc->locks_spin); + list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { + list_del_init(&lkb->lkb_ownqueue); + lkb->lkb_flags |= DLM_IFL_DEAD; + dlm_put_lkb(lkb); + } + spin_unlock(&proc->locks_spin); + + spin_lock(&proc->asts_spin); + list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) { + list_del(&lkb->lkb_astqueue); + dlm_put_lkb(lkb); + } + spin_unlock(&proc->asts_spin); +} + +/* pid of 0 means purge all orphans */ + +static void do_purge(struct dlm_ls *ls, int nodeid, int pid) +{ + struct dlm_lkb *lkb, *safe; + + mutex_lock(&ls->ls_orphans_mutex); + list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) { + if (pid && lkb->lkb_ownpid != pid) + continue; + unlock_proc_lock(ls, lkb); + list_del_init(&lkb->lkb_ownqueue); + dlm_put_lkb(lkb); + } + mutex_unlock(&ls->ls_orphans_mutex); +} + +static int send_purge(struct dlm_ls *ls, int nodeid, int pid) +{ + struct dlm_message *ms; + struct dlm_mhandle *mh; + int error; + + error = _create_message(ls, sizeof(struct dlm_message), nodeid, + DLM_MSG_PURGE, &ms, &mh); + if (error) + return error; + ms->m_nodeid = nodeid; + ms->m_pid = pid; + + return send_message(mh, ms); +} + +int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc, + int nodeid, int pid) +{ + int error = 0; + + if (nodeid != dlm_our_nodeid()) { + error = send_purge(ls, nodeid, pid); + } else { + dlm_lock_recovery(ls); + if (pid == current->pid) + purge_proc_locks(ls, proc); + else + do_purge(ls, nodeid, pid); + dlm_unlock_recovery(ls); + } + return error; }