R: do_xxxx()
L: receive_xxxx_reply() <- R: send_xxxx_reply()
*/
-
+#include <linux/types.h>
#include "dlm_internal.h"
+#include <linux/dlm_device.h>
#include "memory.h"
#include "lowcomms.h"
#include "requestqueue.h"
#include "rcom.h"
#include "recover.h"
#include "lvb_table.h"
+#include "user.h"
#include "config.h"
static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
{ -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
{ -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
};
-EXPORT_SYMBOL_GPL(dlm_lvb_operations);
#define modes_compat(gr, rq) \
__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
{0, 0, 0, 0, 0, 0, 0, 0} /* PD */
};
-static void dlm_print_lkb(struct dlm_lkb *lkb)
+void dlm_print_lkb(struct dlm_lkb *lkb)
{
printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
" status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
r->res_recover_locks_count, r->res_name);
}
+void dlm_dump_rsb(struct dlm_rsb *r)
+{
+ struct dlm_lkb *lkb;
+
+ dlm_print_rsb(r);
+
+ printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
+ list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
+ printk(KERN_ERR "rsb lookup list\n");
+ list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
+ dlm_print_lkb(lkb);
+ printk(KERN_ERR "rsb grant queue:\n");
+ list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
+ dlm_print_lkb(lkb);
+ printk(KERN_ERR "rsb convert queue:\n");
+ list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
+ dlm_print_lkb(lkb);
+ printk(KERN_ERR "rsb wait queue:\n");
+ list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
+ dlm_print_lkb(lkb);
+}
+
/* Threads cannot use the lockspace while it's being recovered */
static inline void lock_recovery(struct dlm_ls *ls)
if (len == r->res_length && !memcmp(name, r->res_name, len))
goto found;
}
- return -ENOENT;
+ return -EBADR;
found:
if (r->res_nodeid && (flags & R_MASTER))
if (!error)
goto out;
- if (error == -ENOENT && !(flags & R_CREATE))
+ if (error == -EBADR && !(flags & R_CREATE))
goto out;
/* the rsb was found but wasn't a master copy */
{
int rv;
rv = kref_put(&r->res_ref, toss_rsb);
- DLM_ASSERT(!rv, dlm_print_rsb(r););
+ DLM_ASSERT(!rv, dlm_dump_rsb(r););
}
static void kill_rsb(struct kref *kref)
/* All work is done after the return from kref_put() so we
can release the write_lock before the remove and free. */
- DLM_ASSERT(list_empty(&r->res_lookup),);
- DLM_ASSERT(list_empty(&r->res_grantqueue),);
- DLM_ASSERT(list_empty(&r->res_convertqueue),);
- DLM_ASSERT(list_empty(&r->res_waitqueue),);
- DLM_ASSERT(list_empty(&r->res_root_list),);
- DLM_ASSERT(list_empty(&r->res_recover_list),);
+ DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
+ DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
+ DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
+ DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
+ DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
+ DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
}
/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
lkb->lkb_nodeid = -1;
lkb->lkb_grmode = DLM_LOCK_IV;
kref_init(&lkb->lkb_ref);
+ INIT_LIST_HEAD(&lkb->lkb_ownqueue);
get_random_bytes(&bucket, sizeof(bucket));
bucket &= (ls->ls_lkbtbl_size - 1);
mutex_unlock(&ls->ls_waiters_mutex);
}
+/* We clear the RESEND flag because we might be taking an lkb off the waiters
+ list as part of process_requestqueue (e.g. a lookup that has an optimized
+ request reply on the requestqueue) between dlm_recover_waiters_pre() which
+ set RESEND and dlm_recover_waiters_post() */
+
static int _remove_from_waiters(struct dlm_lkb *lkb)
{
int error = 0;
goto out;
}
lkb->lkb_wait_type = 0;
+ lkb->lkb_flags &= ~DLM_IFL_RESEND;
list_del(&lkb->lkb_wait_reply);
unhold_lkb(lkb);
out:
list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
res_hashchain) {
if (!time_after_eq(jiffies, r->res_toss_time +
- dlm_config.toss_secs * HZ))
+ dlm_config.ci_toss_secs * HZ))
continue;
found = 1;
break;
if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
return;
- b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
+ b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
if (b == 1) {
int len = receive_extralen(ms);
memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
lkb->lkb_rqmode = DLM_LOCK_IV;
switch (lkb->lkb_status) {
+ case DLM_LKSTS_GRANTED:
+ break;
case DLM_LKSTS_CONVERT:
move_lkb(r, lkb, DLM_LKSTS_GRANTED);
break;
struct dlm_lkb *lkb, *s;
int high = DLM_LOCK_IV;
- DLM_ASSERT(is_master(r), dlm_print_rsb(r););
+ DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
high = grant_pending_convert(r, high);
high = grant_pending_wait(r, high);
return 0;
}
- DLM_ASSERT(r->res_nodeid == -1, dlm_print_rsb(r););
+ DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
dir_nodeid = dlm_dir_nodeid(r);
return -DLM_EUNLOCK;
}
+/* FIXME: if revert_lock() finds that the lkb is granted, we should
+ skip the queue_cast(ECANCEL). It indicates that the request/convert
+ completed (and queued a normal ast) just before the cancel; we don't
+ want to clobber the sb_result for the normal ast with ECANCEL. */
+
static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
revert_lock(r, lkb);
if (lkb->lkb_astaddr)
ms->m_asts |= AST_COMP;
- if (ms->m_type == DLM_MSG_REQUEST || ms->m_type == DLM_MSG_LOOKUP)
- memcpy(ms->m_extra, r->res_name, r->res_length);
+ /* compare with switch in create_message; send_remove() doesn't
+ use send_args() */
- else if (lkb->lkb_lvbptr)
+ switch (ms->m_type) {
+ case DLM_MSG_REQUEST:
+ case DLM_MSG_LOOKUP:
+ memcpy(ms->m_extra, r->res_name, r->res_length);
+ break;
+ case DLM_MSG_CONVERT:
+ case DLM_MSG_UNLOCK:
+ case DLM_MSG_REQUEST_REPLY:
+ case DLM_MSG_CONVERT_REPLY:
+ case DLM_MSG_GRANT:
+ if (!lkb->lkb_lvbptr)
+ break;
memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
-
+ break;
+ }
}
static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
if (!error && down_conversion(lkb)) {
remove_from_waiters(lkb);
r->res_ls->ls_stub_ms.m_result = 0;
+ r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
}
static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
{
lkb->lkb_exflags = ms->m_exflags;
+ lkb->lkb_sbflags = ms->m_sbflags;
lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
(ms->m_flags & 0x0000FFFF);
}
DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
- if (receive_lvb(ls, lkb, ms))
- return -ENOMEM;
+ if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
+ /* lkb was just created so there won't be an lvb yet */
+ lkb->lkb_lvbptr = allocate_lvb(ls);
+ if (!lkb->lkb_lvbptr)
+ return -ENOMEM;
+ }
return 0;
}
confirm_master(r, error);
break;
- case -ENOENT:
+ case -EBADR:
case -ENOTBLK:
/* find_rsb failed to find rsb or rsb wasn't master */
r->res_nodeid = -1;
{
struct dlm_message *ms = (struct dlm_message *) hd;
struct dlm_ls *ls;
- int error;
+ int error = 0;
if (!recovery)
dlm_message_in(ms);
while (1) {
if (dlm_locking_stopped(ls)) {
- if (!recovery)
- dlm_add_requestqueue(ls, nodeid, hd);
- error = -EINTR;
- goto out;
+ if (recovery) {
+ error = -EINTR;
+ goto out;
+ }
+ error = dlm_add_requestqueue(ls, nodeid, hd);
+ if (error == -EAGAIN)
+ continue;
+ else {
+ error = -EINTR;
+ goto out;
+ }
}
if (lock_recovery_try(ls))
out:
dlm_put_lockspace(ls);
dlm_astd_wake();
- return 0;
+ return error;
}
if (middle_conversion(lkb)) {
hold_lkb(lkb);
ls->ls_stub_ms.m_result = -EINPROGRESS;
+ ls->ls_stub_ms.m_flags = lkb->lkb_flags;
_remove_from_waiters(lkb);
_receive_convert_reply(lkb, &ls->ls_stub_ms);
case DLM_MSG_UNLOCK:
hold_lkb(lkb);
ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
+ ls->ls_stub_ms.m_flags = lkb->lkb_flags;
_remove_from_waiters(lkb);
_receive_unlock_reply(lkb, &ls->ls_stub_ms);
dlm_put_lkb(lkb);
case DLM_MSG_CANCEL:
hold_lkb(lkb);
ls->ls_stub_ms.m_result = -DLM_ECANCEL;
+ ls->ls_stub_ms.m_flags = lkb->lkb_flags;
_remove_from_waiters(lkb);
_receive_cancel_reply(lkb, &ls->ls_stub_ms);
dlm_put_lkb(lkb);
log_error(ls, "invalid lkb wait_type %d",
lkb->lkb_wait_type);
}
+ schedule();
}
mutex_unlock(&ls->ls_waiters_mutex);
}
hold_rsb(r);
lock_rsb(r);
_request_lock(r, lkb);
+ if (is_master(r))
+ confirm_master(r, 0);
unlock_rsb(r);
put_rsb(r);
break;
void dlm_grant_after_purge(struct dlm_ls *ls)
{
struct dlm_rsb *r;
- int i;
+ int bucket = 0;
- for (i = 0; i < ls->ls_rsbtbl_size; i++) {
- r = find_purged_rsb(ls, i);
- if (!r)
+ while (1) {
+ r = find_purged_rsb(ls, bucket);
+ if (!r) {
+ if (bucket == ls->ls_rsbtbl_size - 1)
+ break;
+ bucket++;
continue;
+ }
lock_rsb(r);
if (is_master(r)) {
grant_pending_locks(r);
}
unlock_rsb(r);
put_rsb(r);
+ schedule();
}
}
lock_rsb(r);
switch (error) {
+ case -EBADR:
+ /* There's a chance the new master received our lock before
+ dlm_recover_master_reply(), this wouldn't happen if we did
+ a barrier between recover_masters and recover_locks. */
+ log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
+ (unsigned long)r, r->res_name);
+ dlm_send_rcom_lock(r, lkb);
+ goto out;
case -EEXIST:
log_debug(ls, "master copy exists %x", lkb->lkb_id);
/* fall through */
/* an ack for dlm_recover_locks() which waits for replies from
all the locks it sends to new masters */
dlm_recovered_lock(r);
-
+ out:
unlock_rsb(r);
put_rsb(r);
dlm_put_lkb(lkb);
return 0;
}
+int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
+ int mode, uint32_t flags, void *name, unsigned int namelen,
+ uint32_t parent_lkid)
+{
+ struct dlm_lkb *lkb;
+ struct dlm_args args;
+ int error;
+
+ lock_recovery(ls);
+
+ error = create_lkb(ls, &lkb);
+ if (error) {
+ kfree(ua);
+ goto out;
+ }
+
+ if (flags & DLM_LKF_VALBLK) {
+ ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
+ if (!ua->lksb.sb_lvbptr) {
+ kfree(ua);
+ __put_lkb(ls, lkb);
+ error = -ENOMEM;
+ goto out;
+ }
+ }
+
+ /* After ua is attached to lkb it will be freed by free_lkb().
+ When DLM_IFL_USER is set, the dlm knows that this is a userspace
+ lock and that lkb_astparam is the dlm_user_args structure. */
+
+ error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
+ DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
+ lkb->lkb_flags |= DLM_IFL_USER;
+ ua->old_mode = DLM_LOCK_IV;
+
+ if (error) {
+ __put_lkb(ls, lkb);
+ goto out;
+ }
+
+ error = request_lock(ls, lkb, name, namelen, &args);
+
+ switch (error) {
+ case 0:
+ break;
+ case -EINPROGRESS:
+ error = 0;
+ break;
+ case -EAGAIN:
+ error = 0;
+ /* fall through */
+ default:
+ __put_lkb(ls, lkb);
+ goto out;
+ }
+
+ /* add this new lkb to the per-process list of locks */
+ spin_lock(&ua->proc->locks_spin);
+ kref_get(&lkb->lkb_ref);
+ list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
+ spin_unlock(&ua->proc->locks_spin);
+ out:
+ unlock_recovery(ls);
+ return error;
+}
+
+int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
+ int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
+{
+ struct dlm_lkb *lkb;
+ struct dlm_args args;
+ struct dlm_user_args *ua;
+ int error;
+
+ lock_recovery(ls);
+
+ error = find_lkb(ls, lkid, &lkb);
+ if (error)
+ goto out;
+
+ /* user can change the params on its lock when it converts it, or
+ add an lvb that didn't exist before */
+
+ ua = (struct dlm_user_args *)lkb->lkb_astparam;
+
+ if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
+ ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
+ if (!ua->lksb.sb_lvbptr) {
+ error = -ENOMEM;
+ goto out_put;
+ }
+ }
+ if (lvb_in && ua->lksb.sb_lvbptr)
+ memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
+
+ ua->castparam = ua_tmp->castparam;
+ ua->castaddr = ua_tmp->castaddr;
+ ua->bastparam = ua_tmp->bastparam;
+ ua->bastaddr = ua_tmp->bastaddr;
+ ua->user_lksb = ua_tmp->user_lksb;
+ ua->old_mode = lkb->lkb_grmode;
+
+ error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST,
+ ua, DLM_FAKE_USER_AST, &args);
+ if (error)
+ goto out_put;
+
+ error = convert_lock(ls, lkb, &args);
+
+ if (error == -EINPROGRESS || error == -EAGAIN)
+ error = 0;
+ out_put:
+ dlm_put_lkb(lkb);
+ out:
+ unlock_recovery(ls);
+ kfree(ua_tmp);
+ return error;
+}
+
+int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
+ uint32_t flags, uint32_t lkid, char *lvb_in)
+{
+ struct dlm_lkb *lkb;
+ struct dlm_args args;
+ struct dlm_user_args *ua;
+ int error;
+
+ lock_recovery(ls);
+
+ error = find_lkb(ls, lkid, &lkb);
+ if (error)
+ goto out;
+
+ ua = (struct dlm_user_args *)lkb->lkb_astparam;
+
+ if (lvb_in && ua->lksb.sb_lvbptr)
+ memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
+ ua->castparam = ua_tmp->castparam;
+ ua->user_lksb = ua_tmp->user_lksb;
+
+ error = set_unlock_args(flags, ua, &args);
+ if (error)
+ goto out_put;
+
+ error = unlock_lock(ls, lkb, &args);
+
+ if (error == -DLM_EUNLOCK)
+ error = 0;
+ if (error)
+ goto out_put;
+
+ spin_lock(&ua->proc->locks_spin);
+ /* dlm_user_add_ast() may have already taken lkb off the proc list */
+ if (!list_empty(&lkb->lkb_ownqueue))
+ list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
+ spin_unlock(&ua->proc->locks_spin);
+ out_put:
+ dlm_put_lkb(lkb);
+ out:
+ unlock_recovery(ls);
+ return error;
+}
+
+int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
+ uint32_t flags, uint32_t lkid)
+{
+ struct dlm_lkb *lkb;
+ struct dlm_args args;
+ struct dlm_user_args *ua;
+ int error;
+
+ lock_recovery(ls);
+
+ error = find_lkb(ls, lkid, &lkb);
+ if (error)
+ goto out;
+
+ ua = (struct dlm_user_args *)lkb->lkb_astparam;
+ ua->castparam = ua_tmp->castparam;
+ ua->user_lksb = ua_tmp->user_lksb;
+
+ error = set_unlock_args(flags, ua, &args);
+ if (error)
+ goto out_put;
+
+ error = cancel_lock(ls, lkb, &args);
+
+ if (error == -DLM_ECANCEL)
+ error = 0;
+ if (error)
+ goto out_put;
+
+ /* this lkb was removed from the WAITING queue */
+ if (lkb->lkb_grmode == DLM_LOCK_IV) {
+ spin_lock(&ua->proc->locks_spin);
+ list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
+ spin_unlock(&ua->proc->locks_spin);
+ }
+ out_put:
+ dlm_put_lkb(lkb);
+ out:
+ unlock_recovery(ls);
+ return error;
+}
+
+static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
+{
+ struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
+
+ if (ua->lksb.sb_lvbptr)
+ kfree(ua->lksb.sb_lvbptr);
+ kfree(ua);
+ lkb->lkb_astparam = (long)NULL;
+
+ /* TODO: propogate to master if needed */
+ return 0;
+}
+
+/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
+ Regardless of what rsb queue the lock is on, it's removed and freed. */
+
+static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
+{
+ struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
+ struct dlm_args args;
+ int error;
+
+ /* FIXME: we need to handle the case where the lkb is in limbo
+ while the rsb is being looked up, currently we assert in
+ _unlock_lock/is_remote because rsb nodeid is -1. */
+
+ set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
+
+ error = unlock_lock(ls, lkb, &args);
+ if (error == -DLM_EUNLOCK)
+ error = 0;
+ return error;
+}
+
+/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
+ 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
+ which we clear here. */
+
+/* proc CLOSING flag is set so no more device_reads should look at proc->asts
+ list, and no more device_writes should add lkb's to proc->locks list; so we
+ shouldn't need to take asts_spin or locks_spin here. this assumes that
+ device reads/writes/closes are serialized -- FIXME: we may need to serialize
+ them ourself. */
+
+void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
+{
+ struct dlm_lkb *lkb, *safe;
+
+ lock_recovery(ls);
+ mutex_lock(&ls->ls_clear_proc_locks);
+
+ list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) {
+ list_del_init(&lkb->lkb_ownqueue);
+
+ if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) {
+ lkb->lkb_flags |= DLM_IFL_ORPHAN;
+ orphan_proc_lock(ls, lkb);
+ } else {
+ lkb->lkb_flags |= DLM_IFL_DEAD;
+ unlock_proc_lock(ls, lkb);
+ }
+
+ /* this removes the reference for the proc->locks list
+ added by dlm_user_request, it may result in the lkb
+ being freed */
+
+ dlm_put_lkb(lkb);
+ }
+
+ /* in-progress unlocks */
+ list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
+ list_del_init(&lkb->lkb_ownqueue);
+ lkb->lkb_flags |= DLM_IFL_DEAD;
+ dlm_put_lkb(lkb);
+ }
+
+ list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
+ list_del(&lkb->lkb_astqueue);
+ dlm_put_lkb(lkb);
+ }
+
+ mutex_unlock(&ls->ls_clear_proc_locks);
+ unlock_recovery(ls);
+}
+