Commit 553b5eb9 authored by Joel Becker's avatar Joel Becker

ocfs2: Pass the locking protocol into ocfs2_cluster_connect().

Inside the stackglue, the locking protocol structure is hanging off of
the ocfs2_cluster_connection.  This takes it one further; the locking
protocol is passed into ocfs2_cluster_connect().  Now different cluster
connections can have different locking protocols with distinct asts.
Note that all locking protocols have to keep their maximum protocol
version in lock-step.

With the protocol structure set in ocfs2_cluster_connect(), there is no
need for the stackglue to have a static pointer to a specific protocol
structure.  We can change initialization to only pass in the maximum
protocol version.
Signed-off-by: default avatarJoel Becker <joel.becker@oracle.com>
parent e603cfb0
...@@ -1045,7 +1045,6 @@ static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres) ...@@ -1045,7 +1045,6 @@ static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
return lockres->l_pending_gen; return lockres->l_pending_gen;
} }
static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level) static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level)
{ {
struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
...@@ -1139,6 +1138,88 @@ static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb) ...@@ -1139,6 +1138,88 @@ static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb)
spin_unlock_irqrestore(&lockres->l_lock, flags); spin_unlock_irqrestore(&lockres->l_lock, flags);
} }
static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error)
{
struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
unsigned long flags;
mlog_entry_void();
mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
lockres->l_unlock_action);
spin_lock_irqsave(&lockres->l_lock, flags);
if (error) {
mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
"unlock_action %d\n", error, lockres->l_name,
lockres->l_unlock_action);
spin_unlock_irqrestore(&lockres->l_lock, flags);
mlog_exit_void();
return;
}
switch(lockres->l_unlock_action) {
case OCFS2_UNLOCK_CANCEL_CONVERT:
mlog(0, "Cancel convert success for %s\n", lockres->l_name);
lockres->l_action = OCFS2_AST_INVALID;
/* Downconvert thread may have requeued this lock, we
* need to wake it. */
if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres));
break;
case OCFS2_UNLOCK_DROP_LOCK:
lockres->l_level = DLM_LOCK_IV;
break;
default:
BUG();
}
lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
wake_up(&lockres->l_event);
spin_unlock_irqrestore(&lockres->l_lock, flags);
mlog_exit_void();
}
/*
* This is the filesystem locking protocol. It provides the lock handling
* hooks for the underlying DLM. It has a maximum version number.
* The version number allows interoperability with systems running at
* the same major number and an equal or smaller minor number.
*
* Whenever the filesystem does new things with locks (adds or removes a
* lock, orders them differently, does different things underneath a lock),
* the version must be changed. The protocol is negotiated when joining
* the dlm domain. A node may join the domain if its major version is
* identical to all other nodes and its minor version is greater than
* or equal to all other nodes. When its minor version is greater than
* the other nodes, it will run at the minor version specified by the
* other nodes.
*
* If a locking change is made that will not be compatible with older
* versions, the major number must be increased and the minor version set
* to zero. If a change merely adds a behavior that can be disabled when
* speaking to older versions, the minor version must be increased. If a
* change adds a fully backwards compatible change (eg, LVB changes that
* are just ignored by older versions), the version does not need to be
* updated.
*/
static struct ocfs2_locking_protocol lproto = {
.lp_max_version = {
.pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
.pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
},
.lp_lock_ast = ocfs2_locking_ast,
.lp_blocking_ast = ocfs2_blocking_ast,
.lp_unlock_ast = ocfs2_unlock_ast,
};
void ocfs2_set_locking_protocol(void)
{
ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version);
}
static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
int convert) int convert)
{ {
...@@ -2991,7 +3072,7 @@ int ocfs2_dlm_init(struct ocfs2_super *osb) ...@@ -2991,7 +3072,7 @@ int ocfs2_dlm_init(struct ocfs2_super *osb)
status = ocfs2_cluster_connect(osb->osb_cluster_stack, status = ocfs2_cluster_connect(osb->osb_cluster_stack,
osb->uuid_str, osb->uuid_str,
strlen(osb->uuid_str), strlen(osb->uuid_str),
ocfs2_do_node_down, osb, &lproto, ocfs2_do_node_down, osb,
&conn); &conn);
if (status) { if (status) {
mlog_errno(status); mlog_errno(status);
...@@ -3058,50 +3139,6 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb, ...@@ -3058,50 +3139,6 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
mlog_exit_void(); mlog_exit_void();
} }
static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error)
{
struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
unsigned long flags;
mlog_entry_void();
mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
lockres->l_unlock_action);
spin_lock_irqsave(&lockres->l_lock, flags);
if (error) {
mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
"unlock_action %d\n", error, lockres->l_name,
lockres->l_unlock_action);
spin_unlock_irqrestore(&lockres->l_lock, flags);
mlog_exit_void();
return;
}
switch(lockres->l_unlock_action) {
case OCFS2_UNLOCK_CANCEL_CONVERT:
mlog(0, "Cancel convert success for %s\n", lockres->l_name);
lockres->l_action = OCFS2_AST_INVALID;
/* Downconvert thread may have requeued this lock, we
* need to wake it. */
if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres));
break;
case OCFS2_UNLOCK_DROP_LOCK:
lockres->l_level = DLM_LOCK_IV;
break;
default:
BUG();
}
lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
wake_up(&lockres->l_event);
spin_unlock_irqrestore(&lockres->l_lock, flags);
mlog_exit_void();
}
static int ocfs2_drop_lock(struct ocfs2_super *osb, static int ocfs2_drop_lock(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres) struct ocfs2_lock_res *lockres)
{ {
...@@ -3910,45 +3947,6 @@ void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex) ...@@ -3910,45 +3947,6 @@ void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex)
ocfs2_cluster_unlock(osb, lockres, level); ocfs2_cluster_unlock(osb, lockres, level);
} }
/*
* This is the filesystem locking protocol. It provides the lock handling
* hooks for the underlying DLM. It has a maximum version number.
* The version number allows interoperability with systems running at
* the same major number and an equal or smaller minor number.
*
* Whenever the filesystem does new things with locks (adds or removes a
* lock, orders them differently, does different things underneath a lock),
* the version must be changed. The protocol is negotiated when joining
* the dlm domain. A node may join the domain if its major version is
* identical to all other nodes and its minor version is greater than
* or equal to all other nodes. When its minor version is greater than
* the other nodes, it will run at the minor version specified by the
* other nodes.
*
* If a locking change is made that will not be compatible with older
* versions, the major number must be increased and the minor version set
* to zero. If a change merely adds a behavior that can be disabled when
* speaking to older versions, the minor version must be increased. If a
* change adds a fully backwards compatible change (eg, LVB changes that
* are just ignored by older versions), the version does not need to be
* updated.
*/
static struct ocfs2_locking_protocol lproto = {
.lp_max_version = {
.pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
.pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
},
.lp_lock_ast = ocfs2_locking_ast,
.lp_blocking_ast = ocfs2_blocking_ast,
.lp_unlock_ast = ocfs2_unlock_ast,
};
void ocfs2_set_locking_protocol(void)
{
ocfs2_stack_glue_set_locking_protocol(&lproto);
}
static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres) struct ocfs2_lock_res *lockres)
{ {
......
...@@ -36,7 +36,7 @@ ...@@ -36,7 +36,7 @@
#define OCFS2_STACK_PLUGIN_USER "user" #define OCFS2_STACK_PLUGIN_USER "user"
#define OCFS2_MAX_HB_CTL_PATH 256 #define OCFS2_MAX_HB_CTL_PATH 256
static struct ocfs2_locking_protocol *lproto; static struct ocfs2_protocol_version locking_max_version;
static DEFINE_SPINLOCK(ocfs2_stack_lock); static DEFINE_SPINLOCK(ocfs2_stack_lock);
static LIST_HEAD(ocfs2_stack_list); static LIST_HEAD(ocfs2_stack_list);
static char cluster_stack_name[OCFS2_STACK_LABEL_LEN + 1]; static char cluster_stack_name[OCFS2_STACK_LABEL_LEN + 1];
...@@ -176,7 +176,7 @@ int ocfs2_stack_glue_register(struct ocfs2_stack_plugin *plugin) ...@@ -176,7 +176,7 @@ int ocfs2_stack_glue_register(struct ocfs2_stack_plugin *plugin)
spin_lock(&ocfs2_stack_lock); spin_lock(&ocfs2_stack_lock);
if (!ocfs2_stack_lookup(plugin->sp_name)) { if (!ocfs2_stack_lookup(plugin->sp_name)) {
plugin->sp_count = 0; plugin->sp_count = 0;
plugin->sp_max_proto = lproto->lp_max_version; plugin->sp_max_proto = locking_max_version;
list_add(&plugin->sp_list, &ocfs2_stack_list); list_add(&plugin->sp_list, &ocfs2_stack_list);
printk(KERN_INFO "ocfs2: Registered cluster interface %s\n", printk(KERN_INFO "ocfs2: Registered cluster interface %s\n",
plugin->sp_name); plugin->sp_name);
...@@ -213,23 +213,23 @@ void ocfs2_stack_glue_unregister(struct ocfs2_stack_plugin *plugin) ...@@ -213,23 +213,23 @@ void ocfs2_stack_glue_unregister(struct ocfs2_stack_plugin *plugin)
} }
EXPORT_SYMBOL_GPL(ocfs2_stack_glue_unregister); EXPORT_SYMBOL_GPL(ocfs2_stack_glue_unregister);
void ocfs2_stack_glue_set_locking_protocol(struct ocfs2_locking_protocol *proto) void ocfs2_stack_glue_set_max_proto_version(struct ocfs2_protocol_version *max_proto)
{ {
struct ocfs2_stack_plugin *p; struct ocfs2_stack_plugin *p;
BUG_ON(proto == NULL);
spin_lock(&ocfs2_stack_lock); spin_lock(&ocfs2_stack_lock);
BUG_ON(active_stack != NULL); if (memcmp(max_proto, &locking_max_version,
sizeof(struct ocfs2_protocol_version))) {
BUG_ON(locking_max_version.pv_major != 0);
lproto = proto; locking_max_version = *max_proto;
list_for_each_entry(p, &ocfs2_stack_list, sp_list) { list_for_each_entry(p, &ocfs2_stack_list, sp_list) {
p->sp_max_proto = lproto->lp_max_version; p->sp_max_proto = locking_max_version;
}
} }
spin_unlock(&ocfs2_stack_lock); spin_unlock(&ocfs2_stack_lock);
} }
EXPORT_SYMBOL_GPL(ocfs2_stack_glue_set_locking_protocol); EXPORT_SYMBOL_GPL(ocfs2_stack_glue_set_max_proto_version);
/* /*
...@@ -245,8 +245,6 @@ int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn, ...@@ -245,8 +245,6 @@ int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
void *name, void *name,
unsigned int namelen) unsigned int namelen)
{ {
BUG_ON(lproto == NULL);
if (!lksb->lksb_conn) if (!lksb->lksb_conn)
lksb->lksb_conn = conn; lksb->lksb_conn = conn;
else else
...@@ -260,7 +258,6 @@ int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn, ...@@ -260,7 +258,6 @@ int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn,
struct ocfs2_dlm_lksb *lksb, struct ocfs2_dlm_lksb *lksb,
u32 flags) u32 flags)
{ {
BUG_ON(lproto == NULL);
BUG_ON(lksb->lksb_conn == NULL); BUG_ON(lksb->lksb_conn == NULL);
return active_stack->sp_ops->dlm_unlock(conn, lksb, flags); return active_stack->sp_ops->dlm_unlock(conn, lksb, flags);
...@@ -314,6 +311,7 @@ EXPORT_SYMBOL_GPL(ocfs2_plock); ...@@ -314,6 +311,7 @@ EXPORT_SYMBOL_GPL(ocfs2_plock);
int ocfs2_cluster_connect(const char *stack_name, int ocfs2_cluster_connect(const char *stack_name,
const char *group, const char *group,
int grouplen, int grouplen,
struct ocfs2_locking_protocol *lproto,
void (*recovery_handler)(int node_num, void (*recovery_handler)(int node_num,
void *recovery_data), void *recovery_data),
void *recovery_data, void *recovery_data,
...@@ -331,6 +329,12 @@ int ocfs2_cluster_connect(const char *stack_name, ...@@ -331,6 +329,12 @@ int ocfs2_cluster_connect(const char *stack_name,
goto out; goto out;
} }
if (memcmp(&lproto->lp_max_version, &locking_max_version,
sizeof(struct ocfs2_protocol_version))) {
rc = -EINVAL;
goto out;
}
new_conn = kzalloc(sizeof(struct ocfs2_cluster_connection), new_conn = kzalloc(sizeof(struct ocfs2_cluster_connection),
GFP_KERNEL); GFP_KERNEL);
if (!new_conn) { if (!new_conn) {
...@@ -456,10 +460,10 @@ static ssize_t ocfs2_max_locking_protocol_show(struct kobject *kobj, ...@@ -456,10 +460,10 @@ static ssize_t ocfs2_max_locking_protocol_show(struct kobject *kobj,
ssize_t ret = 0; ssize_t ret = 0;
spin_lock(&ocfs2_stack_lock); spin_lock(&ocfs2_stack_lock);
if (lproto) if (locking_max_version.pv_major)
ret = snprintf(buf, PAGE_SIZE, "%u.%u\n", ret = snprintf(buf, PAGE_SIZE, "%u.%u\n",
lproto->lp_max_version.pv_major, locking_max_version.pv_major,
lproto->lp_max_version.pv_minor); locking_max_version.pv_minor);
spin_unlock(&ocfs2_stack_lock); spin_unlock(&ocfs2_stack_lock);
return ret; return ret;
...@@ -688,7 +692,10 @@ static int __init ocfs2_stack_glue_init(void) ...@@ -688,7 +692,10 @@ static int __init ocfs2_stack_glue_init(void)
static void __exit ocfs2_stack_glue_exit(void) static void __exit ocfs2_stack_glue_exit(void)
{ {
lproto = NULL; memset(&locking_max_version, 0,
sizeof(struct ocfs2_protocol_version));
locking_max_version.pv_major = 0;
locking_max_version.pv_minor = 0;
ocfs2_sysfs_exit(); ocfs2_sysfs_exit();
if (ocfs2_table_header) if (ocfs2_table_header)
unregister_sysctl_table(ocfs2_table_header); unregister_sysctl_table(ocfs2_table_header);
......
...@@ -241,6 +241,7 @@ struct ocfs2_stack_plugin { ...@@ -241,6 +241,7 @@ struct ocfs2_stack_plugin {
int ocfs2_cluster_connect(const char *stack_name, int ocfs2_cluster_connect(const char *stack_name,
const char *group, const char *group,
int grouplen, int grouplen,
struct ocfs2_locking_protocol *lproto,
void (*recovery_handler)(int node_num, void (*recovery_handler)(int node_num,
void *recovery_data), void *recovery_data),
void *recovery_data, void *recovery_data,
...@@ -270,7 +271,7 @@ int ocfs2_stack_supports_plocks(void); ...@@ -270,7 +271,7 @@ int ocfs2_stack_supports_plocks(void);
int ocfs2_plock(struct ocfs2_cluster_connection *conn, u64 ino, int ocfs2_plock(struct ocfs2_cluster_connection *conn, u64 ino,
struct file *file, int cmd, struct file_lock *fl); struct file *file, int cmd, struct file_lock *fl);
void ocfs2_stack_glue_set_locking_protocol(struct ocfs2_locking_protocol *proto); void ocfs2_stack_glue_set_max_proto_version(struct ocfs2_protocol_version *max_proto);
/* Used by stack plugins */ /* Used by stack plugins */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment