rc->rc_type = type;
+ spin_lock(&ls->ls_recover_lock);
+ rc->rc_seq = ls->ls_recover_seq;
+ spin_unlock(&ls->ls_recover_lock);
+
*mh_ret = mh;
*rc_ret = rc;
return 0;
if (error)
return;
rc->rc_id = rc_in->rc_id;
+ rc->rc_seq_reply = rc_in->rc_seq;
rc->rc_result = dlm_recover_status(ls);
make_config(ls, (struct rcom_config *) rc->rc_buf);
{
struct dlm_rcom *rc;
struct dlm_mhandle *mh;
- int error, inlen, outlen;
- int nodeid = rc_in->rc_header.h_nodeid;
- uint32_t status = dlm_recover_status(ls);
-
- /*
- * We can't run dlm_dir_rebuild_send (which uses ls_nodes) while
- * dlm_recoverd is running ls_nodes_reconfig (which changes ls_nodes).
- * It could only happen in rare cases where we get a late NAMES
- * message from a previous instance of recovery.
- */
-
- if (!(status & DLM_RS_NODES)) {
- log_debug(ls, "ignoring RCOM_NAMES from %u", nodeid);
- return;
- }
+ int error, inlen, outlen, nodeid;
nodeid = rc_in->rc_header.h_nodeid;
inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
if (error)
return;
rc->rc_id = rc_in->rc_id;
+ rc->rc_seq_reply = rc_in->rc_seq;
dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
nodeid);
ret_nodeid = error;
rc->rc_result = ret_nodeid;
rc->rc_id = rc_in->rc_id;
+ rc->rc_seq_reply = rc_in->rc_seq;
send_rcom(ls, mh, rc);
}
memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
rc->rc_id = rc_in->rc_id;
+ rc->rc_seq_reply = rc_in->rc_seq;
send_rcom(ls, mh, rc);
}
static void receive_rcom_lock_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
{
- uint32_t status = dlm_recover_status(ls);
-
- if (!(status & DLM_RS_DIR)) {
- log_debug(ls, "ignoring RCOM_LOCK_REPLY from %u",
- rc_in->rc_header.h_nodeid);
- return;
- }
-
dlm_recover_process_copy(ls, rc_in);
}
rc->rc_type = DLM_RCOM_STATUS_REPLY;
rc->rc_id = rc_in->rc_id;
+ rc->rc_seq_reply = rc_in->rc_seq;
rc->rc_result = -ESRCH;
rf = (struct rcom_config *) rc->rc_buf;
return 0;
}
+static int is_old_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
+{
+ uint64_t seq;
+ int rv = 0;
+
+ switch (rc->rc_type) {
+ case DLM_RCOM_STATUS_REPLY:
+ case DLM_RCOM_NAMES_REPLY:
+ case DLM_RCOM_LOOKUP_REPLY:
+ case DLM_RCOM_LOCK_REPLY:
+ spin_lock(&ls->ls_recover_lock);
+ seq = ls->ls_recover_seq;
+ spin_unlock(&ls->ls_recover_lock);
+ if (rc->rc_seq_reply != seq) {
+ log_error(ls, "ignoring old reply %x from %d "
+ "seq_reply %llx expect %llx",
+ rc->rc_type, rc->rc_header.h_nodeid,
+ (unsigned long long)rc->rc_seq_reply,
+ (unsigned long long)seq);
+ rv = 1;
+ }
+ }
+ return rv;
+}
+
/* Called by dlm_recvd; corresponds to dlm_receive_message() but special
recovery-only comms are sent through here. */
goto out;
}
+ if (is_old_reply(ls, rc))
+ goto out;
+
if (nodeid != rc->rc_header.h_nodeid) {
log_error(ls, "bad rcom nodeid %d from %d",
rc->rc_header.h_nodeid, nodeid);
rc->rc_type = cpu_to_le32(rc->rc_type);
rc->rc_result = cpu_to_le32(rc->rc_result);
rc->rc_id = cpu_to_le64(rc->rc_id);
+ rc->rc_seq = cpu_to_le64(rc->rc_seq);
+ rc->rc_seq_reply = cpu_to_le64(rc->rc_seq_reply);
if (type == DLM_RCOM_LOCK)
rcom_lock_out((struct rcom_lock *) rc->rc_buf);
rc->rc_type = le32_to_cpu(rc->rc_type);
rc->rc_result = le32_to_cpu(rc->rc_result);
rc->rc_id = le64_to_cpu(rc->rc_id);
+ rc->rc_seq = le64_to_cpu(rc->rc_seq);
+ rc->rc_seq_reply = le64_to_cpu(rc->rc_seq_reply);
if (rc->rc_type == DLM_RCOM_LOCK)
rcom_lock_in((struct rcom_lock *) rc->rc_buf);