DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
}
-static int put_lkb(struct dlm_lkb *lkb)
+/* __put_lkb() is used when an lkb may not have an rsb attached to
+ it so we need to provide the lockspace explicitly */
+
+static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
{
- struct dlm_ls *ls = lkb->lkb_resource->res_ls;
uint16_t bucket = lkb->lkb_id & 0xFFFF;
write_lock(&ls->ls_lkbtbl[bucket].lock);
/* for local/process lkbs, lvbptr points to caller's lksb */
if (lkb->lkb_lvbptr && is_master_copy(lkb))
free_lvb(lkb->lkb_lvbptr);
- if (lkb->lkb_range)
- free_range(lkb->lkb_range);
free_lkb(lkb);
return 1;
} else {
int dlm_put_lkb(struct dlm_lkb *lkb)
{
- return put_lkb(lkb);
+ struct dlm_ls *ls;
+
+ DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
+ DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
+
+ ls = lkb->lkb_resource->res_ls;
+ return __put_lkb(ls, lkb);
}
/* This is only called to add a reference when the code already holds
}
lkb->lkb_rqmode = DLM_LOCK_IV;
-
- if (lkb->lkb_range) {
- lkb->lkb_range[GR_RANGE_START] = lkb->lkb_range[RQ_RANGE_START];
- lkb->lkb_range[GR_RANGE_END] = lkb->lkb_range[RQ_RANGE_END];
- }
}
static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
return 0;
}
-/* Return 1 if the locks' ranges overlap. If the lkb has no range then it is
- assumed to cover 0-ffffffff.ffffffff */
-
-static inline int ranges_overlap(struct dlm_lkb *lkb1, struct dlm_lkb *lkb2)
-{
- if (!lkb1->lkb_range || !lkb2->lkb_range)
- return 1;
-
- if (lkb1->lkb_range[RQ_RANGE_END] < lkb2->lkb_range[GR_RANGE_START] ||
- lkb1->lkb_range[RQ_RANGE_START] > lkb2->lkb_range[GR_RANGE_END])
- return 0;
-
- return 1;
-}
-
/* Check if the given lkb conflicts with another lkb on the queue. */
static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
list_for_each_entry(this, head, lkb_statequeue) {
if (this == lkb)
continue;
- if (ranges_overlap(lkb, this) && !modes_compat(this, lkb))
+ if (!modes_compat(this, lkb))
return 1;
}
return 0;
continue;
}
- if (!ranges_overlap(lkb, this))
- continue;
-
if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
return 1;
}
return 1;
/*
- * When using range locks the NOORDER flag is set to avoid the standard
- * vms rules on grant order.
+ * The NOORDER flag is set to avoid the standard vms rules on grant
+ * order.
*/
if (lkb->lkb_exflags & DLM_LKF_NOORDER)
/*
* If there are locks left on the wait/convert queue then send blocking
* ASTs to granted locks based on the largest requested mode (high)
- * found above. This can generate spurious blocking ASTs for range
- * locks. FIXME: highbast < high comparison not valid for PR/CW.
+ * found above. FIXME: highbast < high comparison not valid for PR/CW.
*/
list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
list_for_each_entry(gr, head, lkb_statequeue) {
if (gr->lkb_bastaddr &&
gr->lkb_highbast < lkb->lkb_rqmode &&
- ranges_overlap(lkb, gr) && !modes_compat(gr, lkb)) {
+ !modes_compat(gr, lkb)) {
queue_bast(r, gr, lkb->lkb_rqmode);
gr->lkb_highbast = lkb->lkb_rqmode;
}
static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
int namelen, uint32_t parent_lkid, void *ast,
- void *astarg, void *bast, struct dlm_range *range,
- struct dlm_args *args)
+ void *astarg, void *bast, struct dlm_args *args)
{
int rv = -EINVAL;
args->bastaddr = bast;
args->mode = mode;
args->lksb = lksb;
- args->range = range;
rv = 0;
out:
return rv;
lkb->lkb_lksb = args->lksb;
lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
lkb->lkb_ownpid = (int) current->pid;
-
- rv = 0;
- if (!args->range)
- goto out;
-
- if (!lkb->lkb_range) {
- rv = -ENOMEM;
- lkb->lkb_range = allocate_range(ls);
- if (!lkb->lkb_range)
- goto out;
- /* This is needed for conversions that contain ranges
- where the original lock didn't but it's harmless for
- new locks too. */
- lkb->lkb_range[GR_RANGE_START] = 0LL;
- lkb->lkb_range[GR_RANGE_END] = 0xffffffffffffffffULL;
- }
-
- lkb->lkb_range[RQ_RANGE_START] = args->range->ra_start;
- lkb->lkb_range[RQ_RANGE_END] = args->range->ra_end;
- lkb->lkb_flags |= DLM_IFL_RANGE;
rv = 0;
out:
return rv;
return error;
}
-/* change some property of an existing lkb, e.g. mode, range */
+/* change some property of an existing lkb, e.g. mode */
static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
uint32_t parent_lkid,
void (*ast) (void *astarg),
void *astarg,
- void (*bast) (void *astarg, int mode),
- struct dlm_range *range)
+ void (*bast) (void *astarg, int mode))
{
struct dlm_ls *ls;
struct dlm_lkb *lkb;
goto out;
error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
- astarg, bast, range, &args);
+ astarg, bast, &args);
if (error)
goto out_put;
error = 0;
out_put:
if (convert || error)
- put_lkb(lkb);
+ __put_lkb(ls, lkb);
if (error == -EAGAIN)
error = 0;
out:
if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
error = 0;
out_put:
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
out:
unlock_recovery(ls);
dlm_put_lockspace(ls);
if (lkb->lkb_astaddr)
ms->m_asts |= AST_COMP;
- if (lkb->lkb_range) {
- ms->m_range[0] = lkb->lkb_range[RQ_RANGE_START];
- ms->m_range[1] = lkb->lkb_range[RQ_RANGE_END];
- }
-
if (ms->m_type == DLM_MSG_REQUEST || ms->m_type == DLM_MSG_LOOKUP)
memcpy(ms->m_extra, r->res_name, r->res_length);
return (ms->m_header.h_length - sizeof(struct dlm_message));
}
-static int receive_range(struct dlm_ls *ls, struct dlm_lkb *lkb,
- struct dlm_message *ms)
-{
- if (lkb->lkb_flags & DLM_IFL_RANGE) {
- if (!lkb->lkb_range)
- lkb->lkb_range = allocate_range(ls);
- if (!lkb->lkb_range)
- return -ENOMEM;
- lkb->lkb_range[RQ_RANGE_START] = ms->m_range[0];
- lkb->lkb_range[RQ_RANGE_END] = ms->m_range[1];
- }
- return 0;
-}
-
static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
struct dlm_message *ms)
{
DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
- if (receive_range(ls, lkb, ms))
- return -ENOMEM;
-
if (receive_lvb(ls, lkb, ms))
return -ENOMEM;
if (lkb->lkb_status != DLM_LKSTS_GRANTED)
return -EBUSY;
- if (receive_range(ls, lkb, ms))
- return -ENOMEM;
- if (lkb->lkb_range) {
- lkb->lkb_range[GR_RANGE_START] = 0LL;
- lkb->lkb_range[GR_RANGE_END] = 0xffffffffffffffffULL;
- }
-
if (receive_lvb(ls, lkb, ms))
return -ENOMEM;
lkb->lkb_flags |= DLM_IFL_MSTCPY;
error = receive_request_args(ls, lkb, ms);
if (error) {
- put_lkb(lkb);
+ __put_lkb(ls, lkb);
goto fail;
}
error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
if (error) {
- put_lkb(lkb);
+ __put_lkb(ls, lkb);
goto fail;
}
if (error == -EINPROGRESS)
error = 0;
if (error)
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
return;
fail:
unlock_rsb(r);
put_rsb(r);
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
return;
fail:
unlock_rsb(r);
put_rsb(r);
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
return;
fail:
unlock_rsb(r);
put_rsb(r);
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
return;
fail:
unlock_rsb(r);
put_rsb(r);
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
}
static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
unlock_rsb(r);
put_rsb(r);
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
}
static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
unlock_rsb(r);
put_rsb(r);
out:
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
}
static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
_receive_convert_reply(lkb, ms);
out:
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
}
static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
_receive_unlock_reply(lkb, ms);
out:
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
}
static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
_receive_cancel_reply(lkb, ms);
out:
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
}
static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
unlock_rsb(r);
put_rsb(r);
out:
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
}
int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
_remove_from_waiters(lkb);
_receive_unlock_reply(lkb, &ls->ls_stub_ms);
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
break;
case DLM_MSG_CANCEL:
ls->ls_stub_ms.m_result = -DLM_ECANCEL;
_remove_from_waiters(lkb);
_receive_cancel_reply(lkb, &ls->ls_stub_ms);
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
break;
default:
if (test(ls, lkb)) {
del_lkb(r, lkb);
/* this put should free the lkb */
- if (!put_lkb(lkb))
+ if (!dlm_put_lkb(lkb))
log_error(ls, "purged lkb not released");
}
}
lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
- if (lkb->lkb_flags & DLM_IFL_RANGE) {
- lkb->lkb_range = allocate_range(ls);
- if (!lkb->lkb_range)
- return -ENOMEM;
- memcpy(lkb->lkb_range, rl->rl_range, 4*sizeof(uint64_t));
- }
-
if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
lkb->lkb_lvbptr = allocate_lvb(ls);
if (!lkb->lkb_lvbptr)
error = receive_rcom_lock_args(ls, lkb, r, rc);
if (error) {
- put_lkb(lkb);
+ __put_lkb(ls, lkb);
goto out_unlock;
}
unlock_rsb(r);
put_rsb(r);
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
return 0;
}