Commit e7fd4179 authored by David Teigland's avatar David Teigland Committed by Steven Whitehouse

[DLM] The core of the DLM for GFS2/CLVM

This is the core of the distributed lock manager which is required
to use GFS2 as a cluster filesystem. It is also used by CLVM and
can be used as a standalone lock manager independantly of either
of these two projects.

It implements VAX-style locking modes.
Signed-off-by: default avatarDavid Teigland <teigland@redhat.com>
Signed-off-by: default avatarSteve Whitehouse <swhiteho@redhat.com>
parent e4731420
......@@ -1831,6 +1831,7 @@ source "fs/partitions/Kconfig"
endmenu
source "fs/nls/Kconfig"
source "fs/dlm/Kconfig"
endmenu
......@@ -48,6 +48,7 @@ obj-$(CONFIG_SYSFS) += sysfs/
obj-y += devpts/
obj-$(CONFIG_PROFILING) += dcookies.o
obj-$(CONFIG_DLM) += dlm/
# Do not add any filesystems before this line
obj-$(CONFIG_REISERFS_FS) += reiserfs/
......
menu "Distributed Lock Manager"
depends on INET && EXPERIMENTAL
config DLM
tristate "Distributed Lock Manager (DLM)"
depends on SYSFS
depends on IPV6 || IPV6=n
select IP_SCTP
select CONFIGFS_FS
help
A general purpose distributed lock manager for kernel or userspace
applications.
config DLM_DEVICE
tristate "DLM device for userspace access"
depends on DLM
help
This module creates a misc device through which the dlm lockspace
and locking functions become available to userspace applications
(usually through the libdlm library).
config DLM_DEBUG
bool "DLM debugging"
depends on DLM
help
Under the debugfs mount point, the name of each lockspace will
appear as a file in the "dlm" directory. The output is the
list of resource and locks the local node knows about.
endmenu
obj-$(CONFIG_DLM) += dlm.o
obj-$(CONFIG_DLM_DEVICE) += dlm_device.o
dlm-y := ast.o \
config.o \
dir.o \
lock.o \
lockspace.o \
lowcomms.o \
main.o \
member.o \
memory.o \
midcomms.o \
rcom.o \
recover.o \
recoverd.o \
requestqueue.o \
util.o
dlm-$(CONFIG_DLM_DEBUG) += debug_fs.o
dlm_device-y := device.o
/******************************************************************************
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
** of the GNU General Public License v.2.
**
*******************************************************************************
******************************************************************************/
#include "dlm_internal.h"
#include "lock.h"
#include "ast.h"
#define WAKE_ASTS 0
static struct list_head ast_queue;
static spinlock_t ast_queue_lock;
static struct task_struct * astd_task;
static unsigned long astd_wakeflags;
static struct semaphore astd_running;
void dlm_del_ast(struct dlm_lkb *lkb)
{
spin_lock(&ast_queue_lock);
if (lkb->lkb_ast_type & (AST_COMP | AST_BAST))
list_del(&lkb->lkb_astqueue);
spin_unlock(&ast_queue_lock);
}
void dlm_add_ast(struct dlm_lkb *lkb, int type)
{
spin_lock(&ast_queue_lock);
if (!(lkb->lkb_ast_type & (AST_COMP | AST_BAST))) {
kref_get(&lkb->lkb_ref);
list_add_tail(&lkb->lkb_astqueue, &ast_queue);
}
lkb->lkb_ast_type |= type;
spin_unlock(&ast_queue_lock);
set_bit(WAKE_ASTS, &astd_wakeflags);
wake_up_process(astd_task);
}
static void process_asts(void)
{
struct dlm_ls *ls = NULL;
struct dlm_rsb *r = NULL;
struct dlm_lkb *lkb;
void (*cast) (long param);
void (*bast) (long param, int mode);
int type = 0, found, bmode;
for (;;) {
found = FALSE;
spin_lock(&ast_queue_lock);
list_for_each_entry(lkb, &ast_queue, lkb_astqueue) {
r = lkb->lkb_resource;
ls = r->res_ls;
if (dlm_locking_stopped(ls))
continue;
list_del(&lkb->lkb_astqueue);
type = lkb->lkb_ast_type;
lkb->lkb_ast_type = 0;
found = TRUE;
break;
}
spin_unlock(&ast_queue_lock);
if (!found)
break;
cast = lkb->lkb_astaddr;
bast = lkb->lkb_bastaddr;
bmode = lkb->lkb_bastmode;
if ((type & AST_COMP) && cast)
cast(lkb->lkb_astparam);
/* FIXME: Is it safe to look at lkb_grmode here
without doing a lock_rsb() ?
Look at other checks in v1 to avoid basts. */
if ((type & AST_BAST) && bast)
if (!dlm_modes_compat(lkb->lkb_grmode, bmode))
bast(lkb->lkb_astparam, bmode);
/* this removes the reference added by dlm_add_ast
and may result in the lkb being freed */
dlm_put_lkb(lkb);
schedule();
}
}
static inline int no_asts(void)
{
int ret;
spin_lock(&ast_queue_lock);
ret = list_empty(&ast_queue);
spin_unlock(&ast_queue_lock);
return ret;
}
static int dlm_astd(void *data)
{
while (!kthread_should_stop()) {
set_current_state(TASK_INTERRUPTIBLE);
if (!test_bit(WAKE_ASTS, &astd_wakeflags))
schedule();
set_current_state(TASK_RUNNING);
down(&astd_running);
if (test_and_clear_bit(WAKE_ASTS, &astd_wakeflags))
process_asts();
up(&astd_running);
}
return 0;
}
void dlm_astd_wake(void)
{
if (!no_asts()) {
set_bit(WAKE_ASTS, &astd_wakeflags);
wake_up_process(astd_task);
}
}
int dlm_astd_start(void)
{
struct task_struct *p;
int error = 0;
INIT_LIST_HEAD(&ast_queue);
spin_lock_init(&ast_queue_lock);
init_MUTEX(&astd_running);
p = kthread_run(dlm_astd, NULL, "dlm_astd");
if (IS_ERR(p))
error = PTR_ERR(p);
else
astd_task = p;
return error;
}
void dlm_astd_stop(void)
{
kthread_stop(astd_task);
}
void dlm_astd_suspend(void)
{
down(&astd_running);
}
void dlm_astd_resume(void)
{
up(&astd_running);
}
/******************************************************************************
*******************************************************************************
**
** Copyright (C) 2005 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
** of the GNU General Public License v.2.
**
*******************************************************************************
******************************************************************************/
#ifndef __ASTD_DOT_H__
#define __ASTD_DOT_H__
void dlm_add_ast(struct dlm_lkb *lkb, int type);
void dlm_del_ast(struct dlm_lkb *lkb);
void dlm_astd_wake(void);
int dlm_astd_start(void);
void dlm_astd_stop(void);
void dlm_astd_suspend(void);
void dlm_astd_resume(void);
#endif
This diff is collapsed.
/******************************************************************************
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
** of the GNU General Public License v.2.
**
*******************************************************************************
******************************************************************************/
#ifndef __CONFIG_DOT_H__
#define __CONFIG_DOT_H__
#define DLM_MAX_ADDR_COUNT 3
struct dlm_config_info {
int tcp_port;
int buffer_size;
int rsbtbl_size;
int lkbtbl_size;
int dirtbl_size;
int recover_timer;
int toss_secs;
int scan_secs;
};
extern struct dlm_config_info dlm_config;
int dlm_config_init(void);
void dlm_config_exit(void);
int dlm_node_weight(char *lsname, int nodeid);
int dlm_nodeid_list(char *lsname, int **ids_out);
int dlm_nodeid_to_addr(int nodeid, struct sockaddr_storage *addr);
int dlm_addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid);
int dlm_our_nodeid(void);
int dlm_our_addr(struct sockaddr_storage *addr, int num);
#endif /* __CONFIG_DOT_H__ */
/******************************************************************************
*******************************************************************************
**
** Copyright (C) 2005 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
** of the GNU General Public License v.2.
**
*******************************************************************************
******************************************************************************/
#include <linux/pagemap.h>
#include <linux/seq_file.h>
#include <linux/module.h>
#include <linux/ctype.h>
#include <linux/debugfs.h>
#include "dlm_internal.h"
static struct dentry *dlm_root;
struct rsb_iter {
int entry;
struct dlm_ls *ls;
struct list_head *next;
struct dlm_rsb *rsb;
};
static char *print_lockmode(int mode)
{
switch (mode) {
case DLM_LOCK_IV:
return "--";
case DLM_LOCK_NL:
return "NL";
case DLM_LOCK_CR:
return "CR";
case DLM_LOCK_CW:
return "CW";
case DLM_LOCK_PR:
return "PR";
case DLM_LOCK_PW:
return "PW";
case DLM_LOCK_EX:
return "EX";
default:
return "??";
}
}
static void print_lock(struct seq_file *s, struct dlm_lkb *lkb,
struct dlm_rsb *res)
{
seq_printf(s, "%08x %s", lkb->lkb_id, print_lockmode(lkb->lkb_grmode));
if (lkb->lkb_status == DLM_LKSTS_CONVERT
|| lkb->lkb_status == DLM_LKSTS_WAITING)
seq_printf(s, " (%s)", print_lockmode(lkb->lkb_rqmode));
if (lkb->lkb_range) {
/* FIXME: this warns on Alpha */
if (lkb->lkb_status == DLM_LKSTS_CONVERT
|| lkb->lkb_status == DLM_LKSTS_GRANTED)
seq_printf(s, " %" PRIx64 "-%" PRIx64,
lkb->lkb_range[GR_RANGE_START],
lkb->lkb_range[GR_RANGE_END]);
if (lkb->lkb_status == DLM_LKSTS_CONVERT
|| lkb->lkb_status == DLM_LKSTS_WAITING)
seq_printf(s, " (%" PRIx64 "-%" PRIx64 ")",
lkb->lkb_range[RQ_RANGE_START],
lkb->lkb_range[RQ_RANGE_END]);
}
if (lkb->lkb_nodeid) {
if (lkb->lkb_nodeid != res->res_nodeid)
seq_printf(s, " Remote: %3d %08x", lkb->lkb_nodeid,
lkb->lkb_remid);
else
seq_printf(s, " Master: %08x", lkb->lkb_remid);
}
if (lkb->lkb_wait_type)
seq_printf(s, " wait_type: %d", lkb->lkb_wait_type);
seq_printf(s, "\n");
}
static int print_resource(struct dlm_rsb *res, struct seq_file *s)
{
struct dlm_lkb *lkb;
int i, lvblen = res->res_ls->ls_lvblen;
seq_printf(s, "\nResource %p Name (len=%d) \"", res, res->res_length);
for (i = 0; i < res->res_length; i++) {
if (isprint(res->res_name[i]))
seq_printf(s, "%c", res->res_name[i]);
else
seq_printf(s, "%c", '.');
}
if (res->res_nodeid > 0)
seq_printf(s, "\" \nLocal Copy, Master is node %d\n",
res->res_nodeid);
else if (res->res_nodeid == 0)
seq_printf(s, "\" \nMaster Copy\n");
else if (res->res_nodeid == -1)
seq_printf(s, "\" \nLooking up master (lkid %x)\n",
res->res_first_lkid);
else
seq_printf(s, "\" \nInvalid master %d\n", res->res_nodeid);
/* Print the LVB: */
if (res->res_lvbptr) {
seq_printf(s, "LVB: ");
for (i = 0; i < lvblen; i++) {
if (i == lvblen / 2)
seq_printf(s, "\n ");
seq_printf(s, "%02x ",
(unsigned char) res->res_lvbptr[i]);
}
if (rsb_flag(res, RSB_VALNOTVALID))
seq_printf(s, " (INVALID)");
seq_printf(s, "\n");
}
/* Print the locks attached to this resource */
seq_printf(s, "Granted Queue\n");
list_for_each_entry(lkb, &res->res_grantqueue, lkb_statequeue)
print_lock(s, lkb, res);
seq_printf(s, "Conversion Queue\n");
list_for_each_entry(lkb, &res->res_convertqueue, lkb_statequeue)
print_lock(s, lkb, res);
seq_printf(s, "Waiting Queue\n");
list_for_each_entry(lkb, &res->res_waitqueue, lkb_statequeue)
print_lock(s, lkb, res);
return 0;
}
static int rsb_iter_next(struct rsb_iter *ri)
{
struct dlm_ls *ls = ri->ls;
int i;
if (!ri->next) {
top:
/* Find the next non-empty hash bucket */
for (i = ri->entry; i < ls->ls_rsbtbl_size; i++) {
read_lock(&ls->ls_rsbtbl[i].lock);
if (!list_empty(&ls->ls_rsbtbl[i].list)) {
ri->next = ls->ls_rsbtbl[i].list.next;
read_unlock(&ls->ls_rsbtbl[i].lock);
break;
}
read_unlock(&ls->ls_rsbtbl[i].lock);
}
ri->entry = i;
if (ri->entry >= ls->ls_rsbtbl_size)
return 1;
} else {
i = ri->entry;
read_lock(&ls->ls_rsbtbl[i].lock);
ri->next = ri->next->next;
if (ri->next->next == ls->ls_rsbtbl[i].list.next) {
/* End of list - move to next bucket */
ri->next = NULL;
ri->entry++;
read_unlock(&ls->ls_rsbtbl[i].lock);
goto top;
}
read_unlock(&ls->ls_rsbtbl[i].lock);
}
ri->rsb = list_entry(ri->next, struct dlm_rsb, res_hashchain);
return 0;
}
static void rsb_iter_free(struct rsb_iter *ri)
{
kfree(ri);
}
static struct rsb_iter *rsb_iter_init(struct dlm_ls *ls)
{
struct rsb_iter *ri;
ri = kmalloc(sizeof *ri, GFP_KERNEL);
if (!ri)
return NULL;
ri->ls = ls;
ri->entry = 0;
ri->next = NULL;
if (rsb_iter_next(ri)) {
rsb_iter_free(ri);
return NULL;
}
return ri;
}
static void *seq_start(struct seq_file *file, loff_t *pos)
{
struct rsb_iter *ri;
loff_t n = *pos;
ri = rsb_iter_init(file->private);
if (!ri)
return NULL;
while (n--) {
if (rsb_iter_next(ri)) {
rsb_iter_free(ri);
return NULL;
}
}
return ri;
}
static void *seq_next(struct seq_file *file, void *iter_ptr, loff_t *pos)
{
struct rsb_iter *ri = iter_ptr;
(*pos)++;
if (rsb_iter_next(ri)) {
rsb_iter_free(ri);
return NULL;
}
return ri;
}
static void seq_stop(struct seq_file *file, void *iter_ptr)
{
/* nothing for now */
}
static int seq_show(struct seq_file *file, void *iter_ptr)
{
struct rsb_iter *ri = iter_ptr;
print_resource(ri->rsb, file);
return 0;
}
static struct seq_operations dlm_seq_ops = {
.start = seq_start,
.next = seq_next,
.stop = seq_stop,
.show = seq_show,
};
static int do_open(struct inode *inode, struct file *file)
{
struct seq_file *seq;
int ret;
ret = seq_open(file, &dlm_seq_ops);
if (ret)
return ret;
seq = file->private_data;
seq->private = inode->u.generic_ip;
return 0;
}
static struct file_operations dlm_fops = {
.owner = THIS_MODULE,
.open = do_open,
.read = seq_read,
.llseek = seq_lseek,
.release = seq_release
};
int dlm_create_debug_file(struct dlm_ls *ls)
{
ls->ls_debug_dentry = debugfs_create_file(ls->ls_name,
S_IFREG | S_IRUGO,
dlm_root,
ls,
&dlm_fops);
return ls->ls_debug_dentry ? 0 : -ENOMEM;
}
void dlm_delete_debug_file(struct dlm_ls *ls)
{
if (ls->ls_debug_dentry)
debugfs_remove(ls->ls_debug_dentry);
}
int dlm_register_debugfs(void)
{
dlm_root = debugfs_create_dir("dlm", NULL);
return dlm_root ? 0 : -ENOMEM;
}
void dlm_unregister_debugfs(void)
{
debugfs_remove(dlm_root);
}
This diff is collapsed.
This diff is collapsed.
/******************************************************************************
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
** of the GNU General Public License v.2.
**
*******************************************************************************
******************************************************************************/
#ifndef __DIR_DOT_H__
#define __DIR_DOT_H__
int dlm_dir_nodeid(struct dlm_rsb *rsb);
int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash);
void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int len);
void dlm_dir_clear(struct dlm_ls *ls);
void dlm_clear_free_entries(struct dlm_ls *ls);
int dlm_recover_directory(struct dlm_ls *ls);
int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
int *r_nodeid);
void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
char *outbuf, int outlen, int nodeid);
#endif /* __DIR_DOT_H__ */
This diff is collapsed.
This diff is collapsed.
/******************************************************************************
*******************************************************************************
**
** Copyright (C) 2005 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
** of the GNU General Public License v.2.
**
*******************************************************************************
******************************************************************************/
#ifndef __LOCK_DOT_H__
#define __LOCK_DOT_H__
void dlm_print_rsb(struct dlm_rsb *r);
int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery);
int dlm_modes_compat(int mode1, int mode2);
int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
unsigned int flags, struct dlm_rsb **r_ret);
void dlm_put_rsb(struct dlm_rsb *r);
void dlm_hold_rsb(struct dlm_rsb *r);
int dlm_put_lkb(struct dlm_lkb *lkb);
void dlm_scan_rsbs(struct dlm_ls *ls);
int dlm_purge_locks(struct dlm_ls *ls);
void dlm_purge_mstcpy_locks(struct dlm_rsb *r);
int dlm_grant_after_purge(struct dlm_ls *ls);
int dlm_recover_waiters_post(struct dlm_ls *ls);
void dlm_recover_waiters_pre(struct dlm_ls *ls);
int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc);
int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc);
static inline int is_master(struct dlm_rsb *r)
{
return !r->res_nodeid;
}
static inline void lock_rsb(struct dlm_rsb *r)
{
down(&r->res_sem);
}
static inline void unlock_rsb(struct dlm_rsb *r)
{
up(&r->res_sem);
}
#endif
This diff is collapsed.
/******************************************************************************
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
** of the GNU General Public License v.2.
**
*******************************************************************************
******************************************************************************/
#ifndef __LOCKSPACE_DOT_H__
#define __LOCKSPACE_DOT_H__
int dlm_lockspace_init(void);
void dlm_lockspace_exit(void);
struct dlm_ls *dlm_find_lockspace_global(uint32_t id);
struct dlm_ls *dlm_find_lockspace_local(void *id);
void dlm_put_lockspace(struct dlm_ls *ls);
#endif /* __LOCKSPACE_DOT_H__ */
This diff is collapsed.
/******************************************************************************
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
** of the GNU General Public License v.2.
**
*******************************************************************************
******************************************************************************/
#ifndef __LOWCOMMS_DOT_H__
#define __LOWCOMMS_DOT_H__
int dlm_lowcomms_init(void);
void dlm_lowcomms_exit(void);
int dlm_lowcomms_start(void);
void dlm_lowcomms_stop(void);
void *dlm_lowcomms_get_buffer(int nodeid, int len, int allocation, char **ppc);
void dlm_lowcomms_commit_buffer(void *mh);
#endif /* __LOWCOMMS_DOT_H__ */
/******************************************************************************
*******************************************************************************
**
** Copyright (C) 2005 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
** of the GNU General Public License v.2.
**
*******************************************************************************
******************************************************************************/
#ifndef __LVB_TABLE_DOT_H__
#define __LVB_TABLE_DOT_H__
extern const int dlm_lvb_operations[8][8];
#endif
/******************************************************************************
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
** of the GNU General Public License v.2.
**
*******************************************************************************
******************************************************************************/
#include "dlm_internal.h"
#include "lockspace.h"
#include "lock.h"
#include "memory.h"
#include "lowcomms.h"
#include "config.h"
#ifdef CONFIG_DLM_DEBUG
int dlm_register_debugfs(void);
void dlm_unregister_debugfs(void);
#else
static inline int dlm_register_debugfs(void) { return 0; }
static inline void dlm_unregister_debugfs(void) { }
#endif
static int __init init_dlm(void)
{
int error;
error = dlm_memory_init();
if (error)
goto out;
error = dlm_lockspace_init();
if (error)
goto out_mem;
error = dlm_config_init();
if (error)
goto out_lockspace;
error = dlm_register_debugfs();
if (error)
goto out_config;
error = dlm_lowcomms_init();
if (error)
goto out_debug;
printk("DLM (built %s %s) installed\n", __DATE__, __TIME__);
return 0;
out_debug:
dlm_unregister_debugfs();
out_config:
dlm_config_exit();
out_lockspace:
dlm_lockspace_exit();
out_mem:
dlm_memory_exit();
out:
return error;
}
static void __exit exit_dlm(void)
{
dlm_lowcomms_exit();
dlm_config_exit();
dlm_memory_exit();
dlm_lockspace_exit();
dlm_unregister_debugfs();
}
module_init(init_dlm);
module_exit(exit_dlm);
MODULE_DESCRIPTION("Distributed Lock Manager");
MODULE_AUTHOR("Red Hat, Inc.");
MODULE_LICENSE("GPL");
EXPORT_SYMBOL_GPL(dlm_new_lockspace);
EXPORT_SYMBOL_GPL(dlm_release_lockspace);
EXPORT_SYMBOL_GPL(dlm_lock);
EXPORT_SYMBOL_GPL(dlm_unlock);
/******************************************************************************
*******************************************************************************
**
** Copyright (C) 2005 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
** of the GNU General Public License v.2.
**
*******************************************************************************
******************************************************************************/
#include "dlm_internal.h"
#include "lockspace.h"
#include "member.h"
#include "recoverd.h"
#include "recover.h"
#include "lowcomms.h"
#include "rcom.h"
#include "config.h"
/*
* Following called by dlm_recoverd thread
*/
static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
{
struct dlm_member *memb = NULL;
struct list_head *tmp;
struct list_head *newlist = &new->list;
struct list_head *head = &ls->ls_nodes;
list_for_each(tmp, head) {
memb = list_entry(tmp, struct dlm_member, list);
if (new->nodeid < memb->nodeid)
break;
}
if (!memb)
list_add_tail(newlist, head);
else {
/* FIXME: can use list macro here */
newlist->prev = tmp->prev;
newlist->next = tmp;
tmp->prev->next = newlist;
tmp->prev = newlist;
}
}
static int dlm_add_member(struct dlm_ls *ls, int nodeid)
{
struct dlm_member *memb;
int w;
memb = kmalloc(sizeof(struct dlm_member), GFP_KERNEL);
if (!memb)
return -ENOMEM;
w = dlm_node_weight(ls->ls_name, nodeid);
if (w < 0)
return w;
memb->nodeid = nodeid;
memb->weight = w;
add_ordered_member(ls, memb);
ls->ls_num_nodes++;
return 0;
}
static void dlm_remove_member(struct dlm_ls *ls, struct dlm_member *memb)
{
list_move(&memb->list, &ls->ls_nodes_gone);
ls->ls_num_nodes--;
}
static int dlm_is_member(struct dlm_ls *ls, int nodeid)
{
struct dlm_member *memb;
list_for_each_entry(memb, &ls->ls_nodes, list) {
if (memb->nodeid == nodeid)
return TRUE;
}
return FALSE;
}
int dlm_is_removed(struct dlm_ls *ls, int nodeid)
{
struct dlm_member *memb;
list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
if (memb->nodeid == nodeid)
return TRUE;
}
return FALSE;
}
static void clear_memb_list(struct list_head *head)
{
struct dlm_member *memb;
while (!list_empty(head)) {
memb = list_entry(head->next, struct dlm_member, list);
list_del(&memb->list);
kfree(memb);
}
}
void dlm_clear_members(struct dlm_ls *ls)
{
clear_memb_list(&ls->ls_nodes);
ls->ls_num_nodes = 0;
}
void dlm_clear_members_gone(struct dlm_ls *ls)
{
clear_memb_list(&ls->ls_nodes_gone);
}
static void make_member_array(struct dlm_ls *ls)
{
struct dlm_member *memb;
int i, w, x = 0, total = 0, all_zero = 0, *array;
kfree(ls->ls_node_array);
ls->ls_node_array = NULL;
list_for_each_entry(memb, &ls->ls_nodes, list) {
if (memb->weight)
total += memb->weight;
}
/* all nodes revert to weight of 1 if all have weight 0 */
if (!total) {
total = ls->ls_num_nodes;
all_zero = 1;
}
ls->ls_total_weight = total;
array = kmalloc(sizeof(int) * total, GFP_KERNEL);
if (!array)
return;
list_for_each_entry(memb, &ls->ls_nodes, list) {
if (!all_zero && !memb->weight)
continue;
if (all_zero)
w = 1;
else
w = memb->weight;
DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
for (i = 0; i < w; i++)
array[x++] = memb->nodeid;
}
ls->ls_node_array = array;
}
/* send a status request to all members just to establish comms connections */
static void ping_members(struct dlm_ls *ls)
{
struct dlm_member *memb;
list_for_each_entry(memb, &ls->ls_nodes, list)
dlm_rcom_status(ls, memb->nodeid);
}
int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
{
struct dlm_member *memb, *safe;
int i, error, found, pos = 0, neg = 0, low = -1;
/* move departed members from ls_nodes to ls_nodes_gone */
list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
found = FALSE;
for (i = 0; i < rv->node_count; i++) {
if (memb->nodeid == rv->nodeids[i]) {
found = TRUE;
break;
}
}
if (!found) {
neg++;
dlm_remove_member(ls, memb);
log_debug(ls, "remove member %d", memb->nodeid);
}
}
/* add new members to ls_nodes */
for (i = 0; i < rv->node_count; i++) {
if (dlm_is_member(ls, rv->nodeids[i]))
continue;
dlm_add_member(ls, rv->nodeids[i]);
pos++;
log_debug(ls, "add member %d", rv->nodeids[i]);
}
list_for_each_entry(memb, &ls->ls_nodes, list) {
if (low == -1 || memb->nodeid < low)
low = memb->nodeid;
}
ls->ls_low_nodeid = low;
make_member_array(ls);
dlm_set_recover_status(ls, DLM_RS_NODES);
*neg_out = neg;
ping_members(ls);
error = dlm_recover_members_wait(ls);
log_debug(ls, "total members %d", ls->ls_num_nodes);
return error;
}
/*
* Following called from lockspace.c
*/
int dlm_ls_stop(struct dlm_ls *ls)
{
int new;
/*
* A stop cancels any recovery that's in progress (see RECOVERY_STOP,
* dlm_recovery_stopped()) and prevents any new locks from being
* processed (see RUNNING, dlm_locking_stopped()).
*/
spin_lock(&ls->ls_recover_lock);
set_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
ls->ls_recover_seq++;
spin_unlock(&ls->ls_recover_lock);
/*
* This in_recovery lock does two things:
*
* 1) Keeps this function from returning until all threads are out
* of locking routines and locking is truely stopped.
* 2) Keeps any new requests from being processed until it's unlocked
* when recovery is complete.
*/
if (new)
down_write(&ls->ls_in_recovery);
/*
* The recoverd suspend/resume makes sure that dlm_recoverd (if
* running) has noticed the clearing of RUNNING above and quit
* processing the previous recovery. This will be true for all nodes
* before any nodes start the new recovery.
*/
dlm_recoverd_suspend(ls);
ls->ls_recover_status = 0;
dlm_recoverd_resume(ls);
return 0;
}
int dlm_ls_start(struct dlm_ls *ls)
{
struct dlm_recover *rv = NULL, *rv_old;
int *ids = NULL;
int error, count;
rv = kmalloc(sizeof(struct dlm_recover), GFP_KERNEL);
if (!rv)
return -ENOMEM;
memset(rv, 0, sizeof(struct dlm_recover));
error = count = dlm_nodeid_list(ls->ls_name, &ids);
if (error <= 0)
goto fail;
spin_lock(&ls->ls_recover_lock);
/* the lockspace needs to be stopped before it can be started */
if (!dlm_locking_stopped(ls)) {
spin_unlock(&ls->ls_recover_lock);
log_error(ls, "start ignored: lockspace running");
error = -EINVAL;
goto fail;
}
rv->nodeids = ids;
rv->node_count = count;
rv->seq = ++ls->ls_recover_seq;
rv_old = ls->ls_recover_args;
ls->ls_recover_args = rv;
spin_unlock(&ls->ls_recover_lock);
if (rv_old) {
kfree(rv_old->nodeids);
kfree(rv_old);
}
dlm_recoverd_kick(ls);
return 0;
fail:
kfree(rv);
kfree(ids);
return error;
}
/******************************************************************************
*******************************************************************************
**
** Copyright (C) 2005 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
** of the GNU General Public License v.2.
**
*******************************************************************************
******************************************************************************/
#ifndef __MEMBER_DOT_H__
#define __MEMBER_DOT_H__
int dlm_ls_stop(struct dlm_ls *ls);
int dlm_ls_start(struct dlm_ls *ls);
void dlm_clear_members(struct dlm_ls *ls);
void dlm_clear_members_gone(struct dlm_ls *ls);
int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv,int *neg_out);
int dlm_is_removed(struct dlm_ls *ls, int nodeid);
#endif /* __MEMBER_DOT_H__ */
/******************************************************************************
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
** of the GNU General Public License v.2.
**
*******************************************************************************
******************************************************************************/
#include "dlm_internal.h"
#include "config.h"
#include "memory.h"
static kmem_cache_t *lkb_cache;
int dlm_memory_init(void)
{
int ret = 0;
lkb_cache = kmem_cache_create("dlm_lkb", sizeof(struct dlm_lkb),
__alignof__(struct dlm_lkb), 0, NULL, NULL);
if (!lkb_cache)
ret = -ENOMEM;
return ret;
}
void dlm_memory_exit(void)
{
if (lkb_cache)
kmem_cache_destroy(lkb_cache);
}
char *allocate_lvb(struct dlm_ls *ls)
{
char *p;
p = kmalloc(ls->ls_lvblen, GFP_KERNEL);
if (p)
memset(p, 0, ls->ls_lvblen);
return p;
}
void free_lvb(char *p)
{
kfree(p);
}
uint64_t *allocate_range(struct dlm_ls *ls)
{
int ralen = 4*sizeof(uint64_t);
uint64_t *p;
p = kmalloc(ralen, GFP_KERNEL);
if (p)
memset(p, 0, ralen);
return p;
}
void free_range(uint64_t *p)
{
kfree(p);
}
/* FIXME: have some minimal space built-in to rsb for the name and
kmalloc a separate name if needed, like dentries are done */
struct dlm_rsb *allocate_rsb(struct dlm_ls *ls, int namelen)
{
struct dlm_rsb *r;
DLM_ASSERT(namelen <= DLM_RESNAME_MAXLEN,);
r = kmalloc(sizeof(*r) + namelen, GFP_KERNEL);
if (r)
memset(r, 0, sizeof(*r) + namelen);
return r;
}
void free_rsb(struct dlm_rsb *r)
{
if (r->res_lvbptr)
free_lvb(r->res_lvbptr);
kfree(r);
}
struct dlm_lkb *allocate_lkb(struct dlm_ls *ls)
{
struct dlm_lkb *lkb;
lkb = kmem_cache_alloc(lkb_cache, GFP_KERNEL);
if (lkb)
memset(lkb, 0, sizeof(*lkb));
return lkb;
}
void free_lkb(struct dlm_lkb *lkb)
{
kmem_cache_free(lkb_cache, lkb);
}
struct dlm_direntry *allocate_direntry(struct dlm_ls *ls, int namelen)
{
struct dlm_direntry *de;
DLM_ASSERT(namelen <= DLM_RESNAME_MAXLEN,);
de = kmalloc(sizeof(*de) + namelen, GFP_KERNEL);
if (de)
memset(de, 0, sizeof(*de) + namelen);
return de;
}
void free_direntry(struct dlm_direntry *de)
{
kfree(de);
}
/******************************************************************************
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
** of the GNU General Public License v.2.
**
*******************************************************************************
******************************************************************************/
#ifndef __MEMORY_DOT_H__
#define __MEMORY_DOT_H__
int dlm_memory_init(void);
void dlm_memory_exit(void);
struct dlm_rsb *allocate_rsb(struct dlm_ls *ls, int namelen);
void free_rsb(struct dlm_rsb *r);
struct dlm_lkb *allocate_lkb(struct dlm_ls *ls);
void free_lkb(struct dlm_lkb *l);
struct dlm_direntry *allocate_direntry(struct dlm_ls *ls, int namelen);
void free_direntry(struct dlm_direntry *de);
char *allocate_lvb(struct dlm_ls *ls);
void free_lvb(char *l);
uint64_t *allocate_range(struct dlm_ls *ls);
void free_range(uint64_t *l);
#endif /* __MEMORY_DOT_H__ */
/******************************************************************************
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
** of the GNU General Public License v.2.
**
*******************************************************************************
******************************************************************************/
/*
* midcomms.c
*
* This is the appallingly named "mid-level" comms layer.
*
* Its purpose is to take packets from the "real" comms layer,
* split them up into packets and pass them to the interested
* part of the locking mechanism.
*
* It also takes messages from the locking layer, formats them
* into packets and sends them to the comms layer.
*/
#include "dlm_internal.h"
#include "lowcomms.h"
#include "config.h"
#include "rcom.h"
#include "lock.h"
#include "midcomms.h"
static void copy_from_cb(void *dst, const void *base, unsigned offset,
unsigned len, unsigned limit)
{
unsigned copy = len;
if ((copy + offset) > limit)
copy = limit - offset;
memcpy(dst, base + offset, copy);
len -= copy;
if (len)
memcpy(dst + copy, base, len);
}
/*
* Called from the low-level comms layer to process a buffer of
* commands.
*
* Only complete messages are processed here, any "spare" bytes from
* the end of a buffer are saved and tacked onto the front of the next
* message that comes in. I doubt this will happen very often but we
* need to be able to cope with it and I don't want the task to be waiting
* for packets to come in when there is useful work to be done.
*/
int dlm_process_incoming_buffer(int nodeid, const void *base,
unsigned offset, unsigned len, unsigned limit)
{
unsigned char __tmp[DLM_INBUF_LEN];
struct dlm_header *msg = (struct dlm_header *) __tmp;
int ret = 0;
int err = 0;
uint16_t msglen;
uint32_t lockspace;
while (len > sizeof(struct dlm_header)) {
/* Copy just the header to check the total length. The
message may wrap around the end of the buffer back to the
start, so we need to use a temp buffer and copy_from_cb. */
copy_from_cb(msg, base, offset, sizeof(struct dlm_header),
limit);
msglen = le16_to_cpu(msg->h_length);
lockspace = msg->h_lockspace;
err = -EINVAL;
if (msglen < sizeof(struct dlm_header))
break;
err = -E2BIG;
if (msglen > dlm_config.buffer_size) {
log_print("message size %d from %d too big, buf len %d",
msglen, nodeid, len);
break;
}
err = 0;
/* If only part of the full message is contained in this
buffer, then do nothing and wait for lowcomms to call
us again later with more data. We return 0 meaning
we've consumed none of the input buffer. */
if (msglen > len)
break;
/* Allocate a larger temp buffer if the full message won't fit
in the buffer on the stack (which should work for most
ordinary messages). */
if (msglen > sizeof(__tmp) &&
msg == (struct dlm_header *) __tmp) {
msg = kmalloc(dlm_config.buffer_size, GFP_KERNEL);
if (msg == NULL)
return ret;
}
copy_from_cb(msg, base, offset, msglen, limit);
BUG_ON(lockspace != msg->h_lockspace);
ret += msglen;
offset += msglen;
offset &= (limit - 1);
len -= msglen;
switch (msg->h_cmd) {
case DLM_MSG:
dlm_receive_message(msg, nodeid, FALSE);
break;
case DLM_RCOM:
dlm_receive_rcom(msg, nodeid);
break;
default:
log_print("unknown msg type %x from %u: %u %u %u %u",
msg->h_cmd, nodeid, msglen, len, offset, ret);
}
}
if (msg != (struct dlm_header *) __tmp)
kfree(msg);
return err ? err : ret;
}
/******************************************************************************
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
** of the GNU General Public License v.2.
**
*******************************************************************************
******************************************************************************/
#ifndef __MIDCOMMS_DOT_H__
#define __MIDCOMMS_DOT_H__
int dlm_process_incoming_buffer(int nodeid, const void *base, unsigned offset,
unsigned len, unsigned limit);
#endif /* __MIDCOMMS_DOT_H__ */
This diff is collapsed.
/******************************************************************************
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2005 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
** of the GNU General Public License v.2.
**
*******************************************************************************
******************************************************************************/
#ifndef __RCOM_DOT_H__
#define __RCOM_DOT_H__
int dlm_rcom_status(struct dlm_ls *ls, int nodeid);
int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len);
int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid);
int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
void dlm_receive_rcom(struct dlm_header *hd, int nodeid);
#endif
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment