/******************************************************************************
*******************************************************************************
**
-** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
+** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
static int receive_extralen(struct dlm_message *ms);
static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
static void del_timeout(struct dlm_lkb *lkb);
-void dlm_timeout_warn(struct dlm_lkb *lkb);
/*
* Lock compatibilty matrix - thanks Steve
{
struct dlm_rsb *r;
- r = allocate_rsb(ls, len);
+ r = dlm_allocate_rsb(ls, len);
if (!r)
return NULL;
error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
if (!error) {
write_unlock(&ls->ls_rsbtbl[bucket].lock);
- free_rsb(r);
+ dlm_free_rsb(r);
r = tmp;
goto out;
}
list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
r->res_toss_time = jiffies;
if (r->res_lvbptr) {
- free_lvb(r->res_lvbptr);
+ dlm_free_lvb(r->res_lvbptr);
r->res_lvbptr = NULL;
}
}
uint32_t lkid = 0;
uint16_t bucket;
- lkb = allocate_lkb(ls);
+ lkb = dlm_allocate_lkb(ls);
if (!lkb)
return -ENOMEM;
/* for local/process lkbs, lvbptr points to caller's lksb */
if (lkb->lkb_lvbptr && is_master_copy(lkb))
- free_lvb(lkb->lkb_lvbptr);
- free_lkb(lkb);
+ dlm_free_lvb(lkb->lkb_lvbptr);
+ dlm_free_lkb(lkb);
return 1;
} else {
write_unlock(&ls->ls_lkbtbl[bucket].lock);
if (is_master(r))
dir_remove(r);
- free_rsb(r);
+ dlm_free_rsb(r);
count++;
} else {
write_unlock(&ls->ls_rsbtbl[b].lock);
return;
if (!r->res_lvbptr)
- r->res_lvbptr = allocate_lvb(r->res_ls);
+ r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
if (!r->res_lvbptr)
return;
return;
if (!r->res_lvbptr)
- r->res_lvbptr = allocate_lvb(r->res_ls);
+ r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
if (!r->res_lvbptr)
return;
break;
case -EAGAIN:
- /* the remote master didn't queue our NOQUEUE request;
- make a waiting lkb the first_lkid */
+ case -EBADR:
+ case -ENOTBLK:
+ /* the remote request failed and won't be retried (it was
+ a NOQUEUE, or has been canceled/unlocked); make a waiting
+ lkb the first_lkid */
r->res_first_lkid = 0;
if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
if (!lkb->lkb_lvbptr)
- lkb->lkb_lvbptr = allocate_lvb(ls);
+ lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
if (!lkb->lkb_lvbptr)
return -ENOMEM;
len = receive_extralen(ms);
if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
/* lkb was just created so there won't be an lvb yet */
- lkb->lkb_lvbptr = allocate_lvb(ls);
+ lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
if (!lkb->lkb_lvbptr)
return -ENOMEM;
}
if (is_overlap(lkb)) {
/* we'll ignore error in cancel/unlock reply */
queue_cast_overlap(r, lkb);
+ confirm_master(r, result);
unhold_lkb(lkb); /* undoes create_lkb() */
} else
_request_lock(r, lkb);
dlm_put_lkb(lkb);
}
-int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
+static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
{
- struct dlm_message *ms = (struct dlm_message *) hd;
- struct dlm_ls *ls;
- int error = 0;
-
- if (!recovery)
- dlm_message_in(ms);
-
- ls = dlm_find_lockspace_global(hd->h_lockspace);
- if (!ls) {
- log_print("drop message %d from %d for unknown lockspace %d",
- ms->m_type, nodeid, hd->h_lockspace);
- return -EINVAL;
- }
-
- /* recovery may have just ended leaving a bunch of backed-up requests
- in the requestqueue; wait while dlm_recoverd clears them */
-
- if (!recovery)
- dlm_wait_requestqueue(ls);
-
- /* recovery may have just started while there were a bunch of
- in-flight requests -- save them in requestqueue to be processed
- after recovery. we can't let dlm_recvd block on the recovery
- lock. if dlm_recoverd is calling this function to clear the
- requestqueue, it needs to be interrupted (-EINTR) if another
- recovery operation is starting. */
-
- while (1) {
- if (dlm_locking_stopped(ls)) {
- if (recovery) {
- error = -EINTR;
- goto out;
- }
- error = dlm_add_requestqueue(ls, nodeid, hd);
- if (error == -EAGAIN)
- continue;
- else {
- error = -EINTR;
- goto out;
- }
- }
-
- if (dlm_lock_recovery_try(ls))
- break;
- schedule();
+ if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
+ log_debug(ls, "ignore non-member message %d from %d %x %x %d",
+ ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
+ ms->m_remid, ms->m_result);
+ return;
}
switch (ms->m_type) {
log_error(ls, "unknown message type %d", ms->m_type);
}
- dlm_unlock_recovery(ls);
- out:
- dlm_put_lockspace(ls);
dlm_astd_wake();
- return error;
}
+/* If the lockspace is in recovery mode (locking stopped), then normal
+ messages are saved on the requestqueue for processing after recovery is
+ done. When not in recovery mode, we wait for dlm_recoverd to drain saved
+ messages off the requestqueue before we process new ones. This occurs right
+ after recovery completes when we transition from saving all messages on
+ requestqueue, to processing all the saved messages, to processing new
+ messages as they arrive. */
-/*
- * Recovery related
- */
+static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
+ int nodeid)
+{
+ if (dlm_locking_stopped(ls)) {
+ dlm_add_requestqueue(ls, nodeid, (struct dlm_header *) ms);
+ } else {
+ dlm_wait_requestqueue(ls);
+ _receive_message(ls, ms);
+ }
+}
+
+/* This is called by dlm_recoverd to process messages that were saved on
+ the requestqueue. */
+
+void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
+{
+ _receive_message(ls, ms);
+}
+
+/* This is called by the midcomms layer when something is received for
+ the lockspace. It could be either a MSG (normal message sent as part of
+ standard locking activity) or an RCOM (recovery message sent as part of
+ lockspace recovery). */
+
+void dlm_receive_buffer(struct dlm_header *hd, int nodeid)
+{
+ struct dlm_message *ms = (struct dlm_message *) hd;
+ struct dlm_rcom *rc = (struct dlm_rcom *) hd;
+ struct dlm_ls *ls;
+ int type = 0;
+
+ switch (hd->h_cmd) {
+ case DLM_MSG:
+ dlm_message_in(ms);
+ type = ms->m_type;
+ break;
+ case DLM_RCOM:
+ dlm_rcom_in(rc);
+ type = rc->rc_type;
+ break;
+ default:
+ log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
+ return;
+ }
+
+ if (hd->h_nodeid != nodeid) {
+ log_print("invalid h_nodeid %d from %d lockspace %x",
+ hd->h_nodeid, nodeid, hd->h_lockspace);
+ return;
+ }
+
+ ls = dlm_find_lockspace_global(hd->h_lockspace);
+ if (!ls) {
+ log_print("invalid h_lockspace %x from %d cmd %d type %d",
+ hd->h_lockspace, nodeid, hd->h_cmd, type);
+
+ if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
+ dlm_send_ls_not_ready(nodeid, rc);
+ return;
+ }
+
+ /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
+ be inactive (in this ls) before transitioning to recovery mode */
+
+ down_read(&ls->ls_recv_active);
+ if (hd->h_cmd == DLM_MSG)
+ dlm_receive_message(ls, ms, nodeid);
+ else
+ dlm_receive_rcom(ls, rc, nodeid);
+ up_read(&ls->ls_recv_active);
+
+ dlm_put_lockspace(ls);
+}
static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
{
void dlm_recover_waiters_pre(struct dlm_ls *ls)
{
struct dlm_lkb *lkb, *safe;
+ int wait_type, stub_unlock_result, stub_cancel_result;
mutex_lock(&ls->ls_waiters_mutex);
if (!waiter_needs_recovery(ls, lkb))
continue;
- switch (lkb->lkb_wait_type) {
+ wait_type = lkb->lkb_wait_type;
+ stub_unlock_result = -DLM_EUNLOCK;
+ stub_cancel_result = -DLM_ECANCEL;
+
+ /* Main reply may have been received leaving a zero wait_type,
+ but a reply for the overlapping op may not have been
+ received. In that case we need to fake the appropriate
+ reply for the overlap op. */
+
+ if (!wait_type) {
+ if (is_overlap_cancel(lkb)) {
+ wait_type = DLM_MSG_CANCEL;
+ if (lkb->lkb_grmode == DLM_LOCK_IV)
+ stub_cancel_result = 0;
+ }
+ if (is_overlap_unlock(lkb)) {
+ wait_type = DLM_MSG_UNLOCK;
+ if (lkb->lkb_grmode == DLM_LOCK_IV)
+ stub_unlock_result = -ENOENT;
+ }
+
+ log_debug(ls, "rwpre overlap %x %x %d %d %d",
+ lkb->lkb_id, lkb->lkb_flags, wait_type,
+ stub_cancel_result, stub_unlock_result);
+ }
+
+ switch (wait_type) {
case DLM_MSG_REQUEST:
lkb->lkb_flags |= DLM_IFL_RESEND;
case DLM_MSG_UNLOCK:
hold_lkb(lkb);
ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
- ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
+ ls->ls_stub_ms.m_result = stub_unlock_result;
ls->ls_stub_ms.m_flags = lkb->lkb_flags;
_receive_unlock_reply(lkb, &ls->ls_stub_ms);
dlm_put_lkb(lkb);
case DLM_MSG_CANCEL:
hold_lkb(lkb);
ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
- ls->ls_stub_ms.m_result = -DLM_ECANCEL;
+ ls->ls_stub_ms.m_result = stub_cancel_result;
ls->ls_stub_ms.m_flags = lkb->lkb_flags;
_receive_cancel_reply(lkb, &ls->ls_stub_ms);
dlm_put_lkb(lkb);
break;
default:
- log_error(ls, "invalid lkb wait_type %d",
- lkb->lkb_wait_type);
+ log_error(ls, "invalid lkb wait_type %d %d",
+ lkb->lkb_wait_type, wait_type);
}
schedule();
}
lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
- lkb->lkb_lvbptr = allocate_lvb(ls);
+ lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
if (!lkb->lkb_lvbptr)
return -ENOMEM;
lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
put_rsb(r);
out:
if (error)
- log_print("recover_master_copy %d %x", error, rl->rl_lkid);
+ log_debug(ls, "recover_master_copy %d %x", error, rl->rl_lkid);
rl->rl_result = error;
return error;
}
}
}
- /* After ua is attached to lkb it will be freed by free_lkb().
+ /* After ua is attached to lkb it will be freed by dlm_free_lkb().
When DLM_IFL_USER is set, the dlm knows that this is a userspace
lock and that lkb_astparam is the dlm_user_args structure. */
if (lvb_in && ua->lksb.sb_lvbptr)
memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
- ua->castparam = ua_tmp->castparam;
+ if (ua_tmp->castparam)
+ ua->castparam = ua_tmp->castparam;
ua->user_lksb = ua_tmp->user_lksb;
error = set_unlock_args(flags, ua, &args);
goto out;
ua = (struct dlm_user_args *)lkb->lkb_astparam;
- ua->castparam = ua_tmp->castparam;
+ if (ua_tmp->castparam)
+ ua->castparam = ua_tmp->castparam;
ua->user_lksb = ua_tmp->user_lksb;
error = set_unlock_args(flags, ua, &args);
}
list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
+ lkb->lkb_ast_type = 0;
list_del(&lkb->lkb_astqueue);
dlm_put_lkb(lkb);
}