Commit 6ed7257b authored by Patrick Caulfield's avatar Patrick Caulfield Committed by Steven Whitehouse

[DLM] Consolidate transport protocols

This patch consolidates the TCP & SCTP protocols for the DLM into a single file
and makes it switchable at run-time (well, at least before the DLM actually
starts up!)

For RHEL5 this patch requires Neil Horman's patch that expands the in-kernel
socket API but that has already been twice ACKed so it should be OK.

The patch adds a new lowcomms.c file that replaces the existing lowcomms-sctp.c
& lowcomms-tcp.c files.
Signed-off-By: default avatarPatrick Caulfield <pcaulfie@redhat.com>
Signed-off-by: default avatarSteven Whitehouse <swhiteho@redhat.com>
parent fc7c44f0
......@@ -3,30 +3,13 @@ menu "Distributed Lock Manager"
config DLM
tristate "Distributed Lock Manager (DLM)"
depends on SYSFS && (IPV6 || IPV6=n)
depends on IPV6 || IPV6=n
select CONFIGFS_FS
select IP_SCTP if DLM_SCTP
select IP_SCTP
help
A general purpose distributed lock manager for kernel or userspace
applications.
choice
prompt "Select DLM communications protocol"
depends on DLM
default DLM_TCP
help
The DLM Can use TCP or SCTP for it's network communications.
SCTP supports multi-homed operations whereas TCP doesn't.
However, SCTP seems to have stability problems at the moment.
config DLM_TCP
bool "TCP/IP"
config DLM_SCTP
bool "SCTP"
endchoice
config DLM_DEBUG
bool "DLM debugging"
depends on DLM
......
......@@ -8,6 +8,7 @@ dlm-y := ast.o \
member.o \
memory.o \
midcomms.o \
lowcomms.o \
rcom.o \
recover.o \
recoverd.o \
......@@ -16,6 +17,3 @@ dlm-y := ast.o \
util.o
dlm-$(CONFIG_DLM_DEBUG) += debug_fs.o
dlm-$(CONFIG_DLM_TCP) += lowcomms-tcp.o
dlm-$(CONFIG_DLM_SCTP) += lowcomms-sctp.o
\ No newline at end of file
......@@ -2,7 +2,7 @@
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
......@@ -89,6 +89,7 @@ struct cluster {
unsigned int cl_toss_secs;
unsigned int cl_scan_secs;
unsigned int cl_log_debug;
unsigned int cl_protocol;
};
enum {
......@@ -101,6 +102,7 @@ enum {
CLUSTER_ATTR_TOSS_SECS,
CLUSTER_ATTR_SCAN_SECS,
CLUSTER_ATTR_LOG_DEBUG,
CLUSTER_ATTR_PROTOCOL,
};
struct cluster_attribute {
......@@ -159,6 +161,7 @@ CLUSTER_ATTR(recover_timer, 1);
CLUSTER_ATTR(toss_secs, 1);
CLUSTER_ATTR(scan_secs, 1);
CLUSTER_ATTR(log_debug, 0);
CLUSTER_ATTR(protocol, 0);
static struct configfs_attribute *cluster_attrs[] = {
[CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr,
......@@ -170,6 +173,7 @@ static struct configfs_attribute *cluster_attrs[] = {
[CLUSTER_ATTR_TOSS_SECS] = &cluster_attr_toss_secs.attr,
[CLUSTER_ATTR_SCAN_SECS] = &cluster_attr_scan_secs.attr,
[CLUSTER_ATTR_LOG_DEBUG] = &cluster_attr_log_debug.attr,
[CLUSTER_ATTR_PROTOCOL] = &cluster_attr_protocol.attr,
NULL,
};
......@@ -904,6 +908,7 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
#define DEFAULT_TOSS_SECS 10
#define DEFAULT_SCAN_SECS 5
#define DEFAULT_LOG_DEBUG 0
#define DEFAULT_PROTOCOL 0
struct dlm_config_info dlm_config = {
.ci_tcp_port = DEFAULT_TCP_PORT,
......@@ -914,6 +919,7 @@ struct dlm_config_info dlm_config = {
.ci_recover_timer = DEFAULT_RECOVER_TIMER,
.ci_toss_secs = DEFAULT_TOSS_SECS,
.ci_scan_secs = DEFAULT_SCAN_SECS,
.ci_log_debug = DEFAULT_LOG_DEBUG
.ci_log_debug = DEFAULT_LOG_DEBUG,
.ci_protocol = DEFAULT_PROTOCOL
};
......@@ -2,7 +2,7 @@
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
......@@ -26,6 +26,7 @@ struct dlm_config_info {
int ci_toss_secs;
int ci_scan_secs;
int ci_log_debug;
int ci_protocol;
};
extern struct dlm_config_info dlm_config;
......
/******************************************************************************
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2004-2006 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
** of the GNU General Public License v.2.
**
*******************************************************************************
******************************************************************************/
/*
* lowcomms.c
*
* This is the "low-level" comms layer.
*
* It is responsible for sending/receiving messages
* from other nodes in the cluster.
*
* Cluster nodes are referred to by their nodeids. nodeids are
* simply 32 bit numbers to the locking module - if they need to
* be expanded for the cluster infrastructure then that is it's
* responsibility. It is this layer's
* responsibility to resolve these into IP address or
* whatever it needs for inter-node communication.
*
* The comms level is two kernel threads that deal mainly with
* the receiving of messages from other nodes and passing them
* up to the mid-level comms layer (which understands the
* message format) for execution by the locking core, and
* a send thread which does all the setting up of connections
* to remote nodes and the sending of data. Threads are not allowed
* to send their own data because it may cause them to wait in times
* of high load. Also, this way, the sending thread can collect together
* messages bound for one node and send them in one block.
*
* I don't see any problem with the recv thread executing the locking
* code on behalf of remote processes as the locking code is
* short, efficient and never (well, hardly ever) waits.
*
*/
#include <asm/ioctls.h>
#include <net/sock.h>
#include <net/tcp.h>
#include <net/sctp/user.h>
#include <linux/pagemap.h>
#include <linux/socket.h>
#include <linux/idr.h>
#include "dlm_internal.h"
#include "lowcomms.h"
#include "config.h"
#include "midcomms.h"
static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
static int dlm_local_count;
static int dlm_local_nodeid;
/* One of these per connected node */
#define NI_INIT_PENDING 1
#define NI_WRITE_PENDING 2
struct nodeinfo {
spinlock_t lock;
sctp_assoc_t assoc_id;
unsigned long flags;
struct list_head write_list; /* nodes with pending writes */
struct list_head writequeue; /* outgoing writequeue_entries */
spinlock_t writequeue_lock;
int nodeid;
struct work_struct swork; /* Send workqueue */
struct work_struct lwork; /* Locking workqueue */
};
static DEFINE_IDR(nodeinfo_idr);
static DECLARE_RWSEM(nodeinfo_lock);
static int max_nodeid;
struct cbuf {
unsigned int base;
unsigned int len;
unsigned int mask;
};
/* Just the one of these, now. But this struct keeps
the connection-specific variables together */
#define CF_READ_PENDING 1
struct connection {
struct socket *sock;
unsigned long flags;
struct page *rx_page;
atomic_t waiting_requests;
struct cbuf cb;
int eagain_flag;
struct work_struct work; /* Send workqueue */
};
/* An entry waiting to be sent */
struct writequeue_entry {
struct list_head list;
struct page *page;
int offset;
int len;
int end;
int users;
struct nodeinfo *ni;
};
static void cbuf_add(struct cbuf *cb, int n)
{
cb->len += n;
}
static int cbuf_data(struct cbuf *cb)
{
return ((cb->base + cb->len) & cb->mask);
}
static void cbuf_init(struct cbuf *cb, int size)
{
cb->base = cb->len = 0;
cb->mask = size-1;
}
static void cbuf_eat(struct cbuf *cb, int n)
{
cb->len -= n;
cb->base += n;
cb->base &= cb->mask;
}
/* List of nodes which have writes pending */
static LIST_HEAD(write_nodes);
static DEFINE_SPINLOCK(write_nodes_lock);
/* Maximum number of incoming messages to process before
* doing a schedule()
*/
#define MAX_RX_MSG_COUNT 25
/* Work queues */
static struct workqueue_struct *recv_workqueue;
static struct workqueue_struct *send_workqueue;
static struct workqueue_struct *lock_workqueue;
/* The SCTP connection */
static struct connection sctp_con;
static void process_send_sockets(struct work_struct *work);
static void process_recv_sockets(struct work_struct *work);
static void process_lock_request(struct work_struct *work);
static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
{
struct sockaddr_storage addr;
int error;
if (!dlm_local_count)
return -1;
error = dlm_nodeid_to_addr(nodeid, &addr);
if (error)
return error;
if (dlm_local_addr[0]->ss_family == AF_INET) {
struct sockaddr_in *in4 = (struct sockaddr_in *) &addr;
struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr;
ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
} else {
struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr;
struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr;
memcpy(&ret6->sin6_addr, &in6->sin6_addr,
sizeof(in6->sin6_addr));
}
return 0;
}
/* If alloc is 0 here we will not attempt to allocate a new
nodeinfo struct */
static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
{
struct nodeinfo *ni;
int r;
int n;
down_read(&nodeinfo_lock);
ni = idr_find(&nodeinfo_idr, nodeid);
up_read(&nodeinfo_lock);
if (ni || !alloc)
return ni;
down_write(&nodeinfo_lock);
ni = idr_find(&nodeinfo_idr, nodeid);
if (ni)
goto out_up;
r = idr_pre_get(&nodeinfo_idr, alloc);
if (!r)
goto out_up;
ni = kmalloc(sizeof(struct nodeinfo), alloc);
if (!ni)
goto out_up;
r = idr_get_new_above(&nodeinfo_idr, ni, nodeid, &n);
if (r) {
kfree(ni);
ni = NULL;
goto out_up;
}
if (n != nodeid) {
idr_remove(&nodeinfo_idr, n);
kfree(ni);
ni = NULL;
goto out_up;
}
memset(ni, 0, sizeof(struct nodeinfo));
spin_lock_init(&ni->lock);
INIT_LIST_HEAD(&ni->writequeue);
spin_lock_init(&ni->writequeue_lock);
INIT_WORK(&ni->lwork, process_lock_request);
INIT_WORK(&ni->swork, process_send_sockets);
ni->nodeid = nodeid;
if (nodeid > max_nodeid)
max_nodeid = nodeid;
out_up:
up_write(&nodeinfo_lock);
return ni;
}
/* Don't call this too often... */
static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc)
{
int i;
struct nodeinfo *ni;
for (i=1; i<=max_nodeid; i++) {
ni = nodeid2nodeinfo(i, 0);
if (ni && ni->assoc_id == assoc)
return ni;
}
return NULL;
}
/* Data or notification available on socket */
static void lowcomms_data_ready(struct sock *sk, int count_unused)
{
if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags))
queue_work(recv_workqueue, &sctp_con.work);
}
/* Add the port number to an IP6 or 4 sockaddr and return the address length.
Also padd out the struct with zeros to make comparisons meaningful */
static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
int *addr_len)
{
struct sockaddr_in *local4_addr;
struct sockaddr_in6 *local6_addr;
if (!dlm_local_count)
return;
if (!port) {
if (dlm_local_addr[0]->ss_family == AF_INET) {
local4_addr = (struct sockaddr_in *)dlm_local_addr[0];
port = be16_to_cpu(local4_addr->sin_port);
} else {
local6_addr = (struct sockaddr_in6 *)dlm_local_addr[0];
port = be16_to_cpu(local6_addr->sin6_port);
}
}
saddr->ss_family = dlm_local_addr[0]->ss_family;
if (dlm_local_addr[0]->ss_family == AF_INET) {
struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
in4_addr->sin_port = cpu_to_be16(port);
memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
memset(in4_addr+1, 0, sizeof(struct sockaddr_storage) -
sizeof(struct sockaddr_in));
*addr_len = sizeof(struct sockaddr_in);
} else {
struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
in6_addr->sin6_port = cpu_to_be16(port);
memset(in6_addr+1, 0, sizeof(struct sockaddr_storage) -
sizeof(struct sockaddr_in6));
*addr_len = sizeof(struct sockaddr_in6);
}
}
/* Close the connection and tidy up */
static void close_connection(void)
{
if (sctp_con.sock) {
sock_release(sctp_con.sock);
sctp_con.sock = NULL;
}
if (sctp_con.rx_page) {
__free_page(sctp_con.rx_page);
sctp_con.rx_page = NULL;
}
}
/* We only send shutdown messages to nodes that are not part of the cluster */
static void send_shutdown(sctp_assoc_t associd)
{
static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
struct msghdr outmessage;
struct cmsghdr *cmsg;
struct sctp_sndrcvinfo *sinfo;
int ret;
outmessage.msg_name = NULL;
outmessage.msg_namelen = 0;
outmessage.msg_control = outcmsg;
outmessage.msg_controllen = sizeof(outcmsg);
outmessage.msg_flags = MSG_EOR;
cmsg = CMSG_FIRSTHDR(&outmessage);
cmsg->cmsg_level = IPPROTO_SCTP;
cmsg->cmsg_type = SCTP_SNDRCV;
cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
outmessage.msg_controllen = cmsg->cmsg_len;
sinfo = CMSG_DATA(cmsg);
memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
sinfo->sinfo_flags |= MSG_EOF;
sinfo->sinfo_assoc_id = associd;
ret = kernel_sendmsg(sctp_con.sock, &outmessage, NULL, 0, 0);
if (ret != 0)
log_print("send EOF to node failed: %d", ret);
}
/* INIT failed but we don't know which node...
restart INIT on all pending nodes */
static void init_failed(void)
{
int i;
struct nodeinfo *ni;
for (i=1; i<=max_nodeid; i++) {
ni = nodeid2nodeinfo(i, 0);
if (!ni)
continue;
if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) {
ni->assoc_id = 0;
if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
spin_lock_bh(&write_nodes_lock);
list_add_tail(&ni->write_list, &write_nodes);
spin_unlock_bh(&write_nodes_lock);
queue_work(send_workqueue, &ni->swork);
}
}
}
}
/* Something happened to an association */
static void process_sctp_notification(struct msghdr *msg, char *buf)
{
union sctp_notification *sn = (union sctp_notification *)buf;
if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) {
switch (sn->sn_assoc_change.sac_state) {
case SCTP_COMM_UP:
case SCTP_RESTART:
{
/* Check that the new node is in the lockspace */
struct sctp_prim prim;
mm_segment_t fs;
int nodeid;
int prim_len, ret;
int addr_len;
struct nodeinfo *ni;
/* This seems to happen when we received a connection
* too early... or something... anyway, it happens but
* we always seem to get a real message too, see
* receive_from_sock */
if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
log_print("COMM_UP for invalid assoc ID %d",
(int)sn->sn_assoc_change.sac_assoc_id);
init_failed();
return;
}
memset(&prim, 0, sizeof(struct sctp_prim));
prim_len = sizeof(struct sctp_prim);
prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
fs = get_fs();
set_fs(get_ds());
ret = sctp_con.sock->ops->getsockopt(sctp_con.sock,
IPPROTO_SCTP,
SCTP_PRIMARY_ADDR,
(char*)&prim,
&prim_len);
set_fs(fs);
if (ret < 0) {
struct nodeinfo *ni;
log_print("getsockopt/sctp_primary_addr on "
"new assoc %d failed : %d",
(int)sn->sn_assoc_change.sac_assoc_id,
ret);
/* Retry INIT later */
ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
if (ni)
clear_bit(NI_INIT_PENDING, &ni->flags);
return;
}
make_sockaddr(&prim.ssp_addr, 0, &addr_len);
if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
log_print("reject connect from unknown addr");
send_shutdown(prim.ssp_assoc_id);
return;
}
ni = nodeid2nodeinfo(nodeid, GFP_KERNEL);
if (!ni)
return;
/* Save the assoc ID */
ni->assoc_id = sn->sn_assoc_change.sac_assoc_id;
log_print("got new/restarted association %d nodeid %d",
(int)sn->sn_assoc_change.sac_assoc_id, nodeid);
/* Send any pending writes */
clear_bit(NI_INIT_PENDING, &ni->flags);
if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
spin_lock_bh(&write_nodes_lock);
list_add_tail(&ni->write_list, &write_nodes);
spin_unlock_bh(&write_nodes_lock);
queue_work(send_workqueue, &ni->swork);
}
}
break;
case SCTP_COMM_LOST:
case SCTP_SHUTDOWN_COMP:
{
struct nodeinfo *ni;
ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
if (ni) {
spin_lock(&ni->lock);
ni->assoc_id = 0;
spin_unlock(&ni->lock);
}
}
break;
/* We don't know which INIT failed, so clear the PENDING flags
* on them all. if assoc_id is zero then it will then try
* again */
case SCTP_CANT_STR_ASSOC:
{
log_print("Can't start SCTP association - retrying");
init_failed();
}
break;
default:
log_print("unexpected SCTP assoc change id=%d state=%d",
(int)sn->sn_assoc_change.sac_assoc_id,
sn->sn_assoc_change.sac_state);
}
}
}
/* Data received from remote end */
static int receive_from_sock(void)
{
int ret = 0;
struct msghdr msg;
struct kvec iov[2];
unsigned len;
int r;
struct sctp_sndrcvinfo *sinfo;
struct cmsghdr *cmsg;
struct nodeinfo *ni;
/* These two are marginally too big for stack allocation, but this
* function is (currently) only called by dlm_recvd so static should be
* OK.
*/
static struct sockaddr_storage msgname;
static char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
if (sctp_con.sock == NULL)
goto out;
if (sctp_con.rx_page == NULL) {
/*
* This doesn't need to be atomic, but I think it should
* improve performance if it is.
*/
sctp_con.rx_page = alloc_page(GFP_ATOMIC);
if (sctp_con.rx_page == NULL)
goto out_resched;
cbuf_init(&sctp_con.cb, PAGE_CACHE_SIZE);
}
memset(&incmsg, 0, sizeof(incmsg));
memset(&msgname, 0, sizeof(msgname));
msg.msg_name = &msgname;
msg.msg_namelen = sizeof(msgname);
msg.msg_flags = 0;
msg.msg_control = incmsg;
msg.msg_controllen = sizeof(incmsg);
msg.msg_iovlen = 1;
/* I don't see why this circular buffer stuff is necessary for SCTP
* which is a packet-based protocol, but the whole thing breaks under
* load without it! The overhead is minimal (and is in the TCP lowcomms
* anyway, of course) so I'll leave it in until I can figure out what's
* really happening.
*/
/*
* iov[0] is the bit of the circular buffer between the current end
* point (cb.base + cb.len) and the end of the buffer.
*/
iov[0].iov_len = sctp_con.cb.base - cbuf_data(&sctp_con.cb);
iov[0].iov_base = page_address(sctp_con.rx_page) +
cbuf_data(&sctp_con.cb);
iov[1].iov_len = 0;
/*
* iov[1] is the bit of the circular buffer between the start of the
* buffer and the start of the currently used section (cb.base)
*/
if (cbuf_data(&sctp_con.cb) >= sctp_con.cb.base) {
iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&sctp_con.cb);
iov[1].iov_len = sctp_con.cb.base;
iov[1].iov_base = page_address(sctp_con.rx_page);
msg.msg_iovlen = 2;
}
len = iov[0].iov_len + iov[1].iov_len;
r = ret = kernel_recvmsg(sctp_con.sock, &msg, iov, msg.msg_iovlen, len,
MSG_NOSIGNAL | MSG_DONTWAIT);
if (ret <= 0)
goto out_close;
msg.msg_control = incmsg;
msg.msg_controllen = sizeof(incmsg);
cmsg = CMSG_FIRSTHDR(&msg);
sinfo = CMSG_DATA(cmsg);
if (msg.msg_flags & MSG_NOTIFICATION) {
process_sctp_notification(&msg, page_address(sctp_con.rx_page));
return 0;
}
/* Is this a new association ? */
ni = nodeid2nodeinfo(le32_to_cpu(sinfo->sinfo_ppid), GFP_KERNEL);
if (ni) {
ni->assoc_id = sinfo->sinfo_assoc_id;
if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) {
if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
spin_lock_bh(&write_nodes_lock);
list_add_tail(&ni->write_list, &write_nodes);
spin_unlock_bh(&write_nodes_lock);
queue_work(send_workqueue, &ni->swork);
}
}
}
/* INIT sends a message with length of 1 - ignore it */
if (r == 1)
return 0;
cbuf_add(&sctp_con.cb, ret);
// PJC: TODO: Add to node's workqueue....can we ??
ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid),
page_address(sctp_con.rx_page),
sctp_con.cb.base, sctp_con.cb.len,
PAGE_CACHE_SIZE);
if (ret < 0)
goto out_close;
cbuf_eat(&sctp_con.cb, ret);
out:
ret = 0;
goto out_ret;
out_resched:
lowcomms_data_ready(sctp_con.sock->sk, 0);
ret = 0;
cond_resched();
goto out_ret;
out_close:
if (ret != -EAGAIN)
log_print("error reading from sctp socket: %d", ret);
out_ret:
return ret;
}
/* Bind to an IP address. SCTP allows multiple address so it can do multi-homing */
static int add_bind_addr(struct sockaddr_storage *addr, int addr_len, int num)
{
mm_segment_t fs;
int result = 0;
fs = get_fs();
set_fs(get_ds());
if (num == 1)
result = sctp_con.sock->ops->bind(sctp_con.sock,
(struct sockaddr *) addr,
addr_len);
else
result = sctp_con.sock->ops->setsockopt(sctp_con.sock, SOL_SCTP,
SCTP_SOCKOPT_BINDX_ADD,
(char *)addr, addr_len);
set_fs(fs);
if (result < 0)
log_print("Can't bind to port %d addr number %d",
dlm_config.ci_tcp_port, num);
return result;
}
static void init_local(void)
{
struct sockaddr_storage sas, *addr;
int i;
dlm_local_nodeid = dlm_our_nodeid();
for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) {
if (dlm_our_addr(&sas, i))
break;
addr = kmalloc(sizeof(*addr), GFP_KERNEL);
if (!addr)
break;
memcpy(addr, &sas, sizeof(*addr));
dlm_local_addr[dlm_local_count++] = addr;
}
}
/* Initialise SCTP socket and bind to all interfaces */
static int init_sock(void)
{
mm_segment_t fs;
struct socket *sock = NULL;
struct sockaddr_storage localaddr;
struct sctp_event_subscribe subscribe;
int result = -EINVAL, num = 1, i, addr_len;
if (!dlm_local_count) {
init_local();
if (!dlm_local_count) {
log_print("no local IP address has been set");
goto out;
}
}
result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
IPPROTO_SCTP, &sock);
if (result < 0) {
log_print("Can't create comms socket, check SCTP is loaded");
goto out;
}
/* Listen for events */
memset(&subscribe, 0, sizeof(subscribe));
subscribe.sctp_data_io_event = 1;
subscribe.sctp_association_event = 1;
subscribe.sctp_send_failure_event = 1;
subscribe.sctp_shutdown_event = 1;
subscribe.sctp_partial_delivery_event = 1;
fs = get_fs();
set_fs(get_ds());
result = sock->ops->setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
(char *)&subscribe, sizeof(subscribe));
set_fs(fs);
if (result < 0) {
log_print("Failed to set SCTP_EVENTS on socket: result=%d",
result);
goto create_delsock;
}
/* Init con struct */
sock->sk->sk_user_data = &sctp_con;
sctp_con.sock = sock;
sctp_con.sock->sk->sk_data_ready = lowcomms_data_ready;
/* Bind to all interfaces. */
for (i = 0; i < dlm_local_count; i++) {
memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
result = add_bind_addr(&localaddr, addr_len, num);
if (result)
goto create_delsock;
++num;
}
result = sock->ops->listen(sock, 5);
if (result < 0) {
log_print("Can't set socket listening");
goto create_delsock;
}
return 0;
create_delsock:
sock_release(sock);
sctp_con.sock = NULL;
out:
return result;
}
static struct writequeue_entry *new_writequeue_entry(gfp_t allocation)
{
struct writequeue_entry *entry;
entry = kmalloc(sizeof(struct writequeue_entry), allocation);
if (!entry)
return NULL;
entry->page = alloc_page(allocation);
if (!entry->page) {
kfree(entry);
return NULL;
}
entry->offset = 0;
entry->len = 0;
entry->end = 0;
entry->users = 0;
return entry;
}
void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
{
struct writequeue_entry *e;
int offset = 0;
int users = 0;
struct nodeinfo *ni;
ni = nodeid2nodeinfo(nodeid, allocation);
if (!ni)
return NULL;
spin_lock(&ni->writequeue_lock);
e = list_entry(ni->writequeue.prev, struct writequeue_entry, list);
if ((&e->list == &ni->writequeue) ||
(PAGE_CACHE_SIZE - e->end < len)) {
e = NULL;
} else {
offset = e->end;
e->end += len;
users = e->users++;
}
spin_unlock(&ni->writequeue_lock);
if (e) {
got_one:
if (users == 0)
kmap(e->page);
*ppc = page_address(e->page) + offset;
return e;
}
e = new_writequeue_entry(allocation);
if (e) {
spin_lock(&ni->writequeue_lock);
offset = e->end;
e->end += len;
e->ni = ni;
users = e->users++;
list_add_tail(&e->list, &ni->writequeue);
spin_unlock(&ni->writequeue_lock);
goto got_one;
}
return NULL;
}
void dlm_lowcomms_commit_buffer(void *arg)
{
struct writequeue_entry *e = (struct writequeue_entry *) arg;
int users;
struct nodeinfo *ni = e->ni;
spin_lock(&ni->writequeue_lock);
users = --e->users;
if (users)
goto out;
e->len = e->end - e->offset;
kunmap(e->page);
spin_unlock(&ni->writequeue_lock);
if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
spin_lock_bh(&write_nodes_lock);
list_add_tail(&ni->write_list, &write_nodes);
spin_unlock_bh(&write_nodes_lock);
queue_work(send_workqueue, &ni->swork);
}
return;
out:
spin_unlock(&ni->writequeue_lock);
return;
}
static void free_entry(struct writequeue_entry *e)
{
__free_page(e->page);
kfree(e);
}
/* Initiate an SCTP association. In theory we could just use sendmsg() on
the first IP address and it should work, but this allows us to set up the
association before sending any valuable data that we can't afford to lose.
It also keeps the send path clean as it can now always use the association ID */
static void initiate_association(int nodeid)
{
struct sockaddr_storage rem_addr;
static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
struct msghdr outmessage;
struct cmsghdr *cmsg;
struct sctp_sndrcvinfo *sinfo;
int ret;
int addrlen;
char buf[1];
struct kvec iov[1];
struct nodeinfo *ni;
log_print("Initiating association with node %d", nodeid);
ni = nodeid2nodeinfo(nodeid, GFP_KERNEL);
if (!ni)
return;
if (nodeid_to_addr(nodeid, (struct sockaddr *)&rem_addr)) {
log_print("no address for nodeid %d", nodeid);
return;
}
make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen);
outmessage.msg_name = &rem_addr;
outmessage.msg_namelen = addrlen;
outmessage.msg_control = outcmsg;
outmessage.msg_controllen = sizeof(outcmsg);
outmessage.msg_flags = MSG_EOR;
iov[0].iov_base = buf;
iov[0].iov_len = 1;
/* Real INIT messages seem to cause trouble. Just send a 1 byte message
we can afford to lose */
cmsg = CMSG_FIRSTHDR(&outmessage);
cmsg->cmsg_level = IPPROTO_SCTP;
cmsg->cmsg_type = SCTP_SNDRCV;
cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
sinfo = CMSG_DATA(cmsg);
memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
outmessage.msg_controllen = cmsg->cmsg_len;
ret = kernel_sendmsg(sctp_con.sock, &outmessage, iov, 1, 1);
if (ret < 0) {
log_print("send INIT to node failed: %d", ret);
/* Try again later */
clear_bit(NI_INIT_PENDING, &ni->flags);
}
}
/* Send a message */
static void send_to_sock(struct nodeinfo *ni)
{
int ret = 0;
struct writequeue_entry *e;
int len, offset;
struct msghdr outmsg;
static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
struct cmsghdr *cmsg;
struct sctp_sndrcvinfo *sinfo;
struct kvec iov;
/* See if we need to init an association before we start
sending precious messages */
spin_lock(&ni->lock);
if (!ni->assoc_id && !test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
spin_unlock(&ni->lock);
initiate_association(ni->nodeid);
return;
}
spin_unlock(&ni->lock);
outmsg.msg_name = NULL; /* We use assoc_id */
outmsg.msg_namelen = 0;
outmsg.msg_control = outcmsg;
outmsg.msg_controllen = sizeof(outcmsg);
outmsg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL | MSG_EOR;
cmsg = CMSG_FIRSTHDR(&outmsg);
cmsg->cmsg_level = IPPROTO_SCTP;
cmsg->cmsg_type = SCTP_SNDRCV;
cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
sinfo = CMSG_DATA(cmsg);
memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
sinfo->sinfo_assoc_id = ni->assoc_id;
outmsg.msg_controllen = cmsg->cmsg_len;
spin_lock(&ni->writequeue_lock);
for (;;) {
if (list_empty(&ni->writequeue))
break;
e = list_entry(ni->writequeue.next, struct writequeue_entry,
list);
len = e->len;
offset = e->offset;
BUG_ON(len == 0 && e->users == 0);
spin_unlock(&ni->writequeue_lock);
kmap(e->page);
ret = 0;
if (len) {
iov.iov_base = page_address(e->page)+offset;
iov.iov_len = len;
ret = kernel_sendmsg(sctp_con.sock, &outmsg, &iov, 1,
len);
if (ret == -EAGAIN) {
sctp_con.eagain_flag = 1;
goto out;
} else if (ret < 0)
goto send_error;
} else {
/* Don't starve people filling buffers */
cond_resched();
}
spin_lock(&ni->writequeue_lock);
e->offset += ret;
e->len -= ret;
if (e->len == 0 && e->users == 0) {
list_del(&e->list);
kunmap(e->page);
free_entry(e);
continue;
}
}
spin_unlock(&ni->writequeue_lock);
out:
return;
send_error:
log_print("Error sending to node %d %d", ni->nodeid, ret);
spin_lock(&ni->lock);
if (!test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
ni->assoc_id = 0;
spin_unlock(&ni->lock);
initiate_association(ni->nodeid);
} else
spin_unlock(&ni->lock);
return;
}
/* Try to send any messages that are pending */
static void process_output_queue(void)
{
struct list_head *list;
struct list_head *temp;
spin_lock_bh(&write_nodes_lock);
list_for_each_safe(list, temp, &write_nodes) {
struct nodeinfo *ni =
list_entry(list, struct nodeinfo, write_list);
clear_bit(NI_WRITE_PENDING, &ni->flags);
list_del(&ni->write_list);
spin_unlock_bh(&write_nodes_lock);
send_to_sock(ni);
spin_lock_bh(&write_nodes_lock);
}
spin_unlock_bh(&write_nodes_lock);
}
/* Called after we've had -EAGAIN and been woken up */
static void refill_write_queue(void)
{
int i;
for (i=1; i<=max_nodeid; i++) {
struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
if (ni) {
if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
spin_lock_bh(&write_nodes_lock);
list_add_tail(&ni->write_list, &write_nodes);
spin_unlock_bh(&write_nodes_lock);
}
}
}
}
static void clean_one_writequeue(struct nodeinfo *ni)
{
struct list_head *list;
struct list_head *temp;
spin_lock(&ni->writequeue_lock);
list_for_each_safe(list, temp, &ni->writequeue) {
struct writequeue_entry *e =
list_entry(list, struct writequeue_entry, list);
list_del(&e->list);
free_entry(e);
}
spin_unlock(&ni->writequeue_lock);
}
static void clean_writequeues(void)
{
int i;
for (i=1; i<=max_nodeid; i++) {
struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
if (ni)
clean_one_writequeue(ni);
}
}
static void dealloc_nodeinfo(void)
{
int i;
for (i=1; i<=max_nodeid; i++) {
struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
if (ni) {
idr_remove(&nodeinfo_idr, i);
kfree(ni);
}
}
}
int dlm_lowcomms_close(int nodeid)
{
struct nodeinfo *ni;
ni = nodeid2nodeinfo(nodeid, 0);
if (!ni)
return -1;
spin_lock(&ni->lock);
if (ni->assoc_id) {
ni->assoc_id = 0;
/* Don't send shutdown here, sctp will just queue it
till the node comes back up! */
}
spin_unlock(&ni->lock);
clean_one_writequeue(ni);
clear_bit(NI_INIT_PENDING, &ni->flags);
return 0;
}
// PJC: The work queue function for receiving.
static void process_recv_sockets(struct work_struct *work)
{
if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) {
int ret;
int count = 0;
do {
ret = receive_from_sock();
/* Don't starve out everyone else */
if (++count >= MAX_RX_MSG_COUNT) {
cond_resched();
count = 0;
}
} while (!kthread_should_stop() && ret >=0);
}
cond_resched();
}
// PJC: the work queue function for sending
static void process_send_sockets(struct work_struct *work)
{
if (sctp_con.eagain_flag) {
sctp_con.eagain_flag = 0;
refill_write_queue();
}
process_output_queue();
}
// PJC: Process lock requests from a particular node.
// TODO: can we optimise this out on UP ??
static void process_lock_request(struct work_struct *work)
{
}
static void daemons_stop(void)
{
destroy_workqueue(recv_workqueue);
destroy_workqueue(send_workqueue);
destroy_workqueue(lock_workqueue);
}
static int daemons_start(void)
{
int error;
recv_workqueue = create_workqueue("dlm_recv");
error = IS_ERR(recv_workqueue);
if (error) {
log_print("can't start dlm_recv %d", error);
return error;
}
send_workqueue = create_singlethread_workqueue("dlm_send");
error = IS_ERR(send_workqueue);
if (error) {
log_print("can't start dlm_send %d", error);
destroy_workqueue(recv_workqueue);
return error;
}
lock_workqueue = create_workqueue("dlm_rlock");
error = IS_ERR(lock_workqueue);
if (error) {
log_print("can't start dlm_rlock %d", error);
destroy_workqueue(send_workqueue);
destroy_workqueue(recv_workqueue);
return error;
}
return 0;
}
/*
* This is quite likely to sleep...
*/
int dlm_lowcomms_start(void)
{
int error;
INIT_WORK(&sctp_con.work, process_recv_sockets);
error = init_sock();
if (error)
goto fail_sock;
error = daemons_start();
if (error)
goto fail_sock;
return 0;
fail_sock:
close_connection();
return error;
}
void dlm_lowcomms_stop(void)
{
int i;
sctp_con.flags = 0x7;
daemons_stop();
clean_writequeues();
close_connection();
dealloc_nodeinfo();
max_nodeid = 0;
dlm_local_count = 0;
dlm_local_nodeid = 0;
for (i = 0; i < dlm_local_count; i++)
kfree(dlm_local_addr[i]);
}
......@@ -36,30 +36,36 @@
* of high load. Also, this way, the sending thread can collect together
* messages bound for one node and send them in one block.
*
* I don't see any problem with the recv thread executing the locking
* code on behalf of remote processes as the locking code is
* short, efficient and never waits.
* lowcomms will choose to use wither TCP or SCTP as its transport layer
* depending on the configuration variable 'protocol'. This should be set
* to 0 (default) for TCP or 1 for SCTP. It shouldbe configured using a
* cluster-wide mechanism as it must be the same on all nodes of the cluster
* for the DLM to function.
*
*/
#include <asm/ioctls.h>
#include <net/sock.h>
#include <net/tcp.h>
#include <linux/pagemap.h>
#include <linux/idr.h>
#include <linux/file.h>
#include <linux/sctp.h>
#include <net/sctp/user.h>
#include "dlm_internal.h"
#include "lowcomms.h"
#include "midcomms.h"
#include "config.h"
#define NEEDED_RMEM (4*1024*1024)
struct cbuf {
unsigned int base;
unsigned int len;
unsigned int mask;
};
#define NODE_INCREMENT 32
static void cbuf_add(struct cbuf *cb, int n)
{
cb->len += n;
......@@ -88,28 +94,25 @@ static bool cbuf_empty(struct cbuf *cb)
return cb->len == 0;
}
/* Maximum number of incoming messages to process before
doing a cond_resched()
*/
#define MAX_RX_MSG_COUNT 25
struct connection {
struct socket *sock; /* NULL if not connected */
uint32_t nodeid; /* So we know who we are in the list */
struct mutex sock_mutex;
unsigned long flags; /* bit 1,2 = We are on the read/write lists */
unsigned long flags;
#define CF_READ_PENDING 1
#define CF_WRITE_PENDING 2
#define CF_CONNECT_PENDING 3
#define CF_IS_OTHERCON 4
#define CF_INIT_PENDING 4
#define CF_IS_OTHERCON 5
struct list_head writequeue; /* List of outgoing writequeue_entries */
struct list_head listenlist; /* List of allocated listening sockets */
spinlock_t writequeue_lock;
int (*rx_action) (struct connection *); /* What to do when active */
void (*connect_action) (struct connection *); /* What to do to connect */
struct page *rx_page;
struct cbuf cb;
int retries;
#define MAX_CONNECT_RETRIES 3
int sctp_assoc;
struct connection *othercon;
struct work_struct rwork; /* Receive workqueue */
struct work_struct swork; /* Send workqueue */
......@@ -127,47 +130,54 @@ struct writequeue_entry {
struct connection *con;
};
static struct sockaddr_storage dlm_local_addr;
static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
static int dlm_local_count;
/* Work queues */
static struct workqueue_struct *recv_workqueue;
static struct workqueue_struct *send_workqueue;
/* An array of pointers to connections, indexed by NODEID */
static struct connection **connections;
static DEFINE_IDR(connections_idr);
static DECLARE_MUTEX(connections_lock);
static int max_nodeid;
static struct kmem_cache *con_cache;
static int conn_array_size;
static void process_recv_sockets(struct work_struct *work);
static void process_send_sockets(struct work_struct *work);
static struct connection *nodeid2con(int nodeid, gfp_t allocation)
/*
* If 'allocation' is zero then we don't attempt to create a new
* connection structure for this node.
*/
static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
{
struct connection *con = NULL;
int r;
int n;
down(&connections_lock);
if (nodeid >= conn_array_size) {
int new_size = nodeid + NODE_INCREMENT;
struct connection **new_conns;
con = idr_find(&connections_idr, nodeid);
if (con || !alloc)
return con;
new_conns = kzalloc(sizeof(struct connection *) *
new_size, allocation);
if (!new_conns)
goto finish;
r = idr_pre_get(&connections_idr, alloc);
if (!r)
return NULL;
memcpy(new_conns, connections, sizeof(struct connection *) * conn_array_size);
conn_array_size = new_size;
kfree(connections);
connections = new_conns;
con = kmem_cache_zalloc(con_cache, alloc);
if (!con)
return NULL;
r = idr_get_new_above(&connections_idr, con, nodeid, &n);
if (r) {
kmem_cache_free(con_cache, con);
return NULL;
}
con = connections[nodeid];
if (con == NULL && allocation) {
con = kmem_cache_zalloc(con_cache, allocation);
if (!con)
goto finish;
if (n != nodeid) {
idr_remove(&connections_idr, n);
kmem_cache_free(con_cache, con);
return NULL;
}
con->nodeid = nodeid;
mutex_init(&con->sock_mutex);
......@@ -176,19 +186,80 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation)
INIT_WORK(&con->swork, process_send_sockets);
INIT_WORK(&con->rwork, process_recv_sockets);
connections[nodeid] = con;
/* Setup action pointers for child sockets */
if (con->nodeid) {
struct connection *zerocon = idr_find(&connections_idr, 0);
con->connect_action = zerocon->connect_action;
if (!con->rx_action)
con->rx_action = zerocon->rx_action;
}
finish:
if (nodeid > max_nodeid)
max_nodeid = nodeid;
return con;
}
static struct connection *nodeid2con(int nodeid, gfp_t allocation)
{
struct connection *con;
down(&connections_lock);
con = __nodeid2con(nodeid, allocation);
up(&connections_lock);
return con;
}
/* This is a bit drastic, but only called when things go wrong */
static struct connection *assoc2con(int assoc_id)
{
int i;
struct connection *con;
down(&connections_lock);
for (i=0; i<max_nodeid; i++) {
con = __nodeid2con(i, 0);
if (con && con->sctp_assoc == assoc_id) {
up(&connections_lock);
return con;
}
}
up(&connections_lock);
return NULL;
}
static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
{
struct sockaddr_storage addr;
int error;
if (!dlm_local_count)
return -1;
error = dlm_nodeid_to_addr(nodeid, &addr);
if (error)
return error;
if (dlm_local_addr[0]->ss_family == AF_INET) {
struct sockaddr_in *in4 = (struct sockaddr_in *) &addr;
struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr;
ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
} else {
struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr;
struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr;
memcpy(&ret6->sin6_addr, &in6->sin6_addr,
sizeof(in6->sin6_addr));
}
return 0;
}
/* Data available on socket or listen socket received a connect */
static void lowcomms_data_ready(struct sock *sk, int count_unused)
{
struct connection *con = sock2con(sk);
if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
queue_work(recv_workqueue, &con->rwork);
}
......@@ -222,20 +293,21 @@ static int add_sock(struct socket *sock, struct connection *con)
con->sock->sk->sk_data_ready = lowcomms_data_ready;
con->sock->sk->sk_write_space = lowcomms_write_space;
con->sock->sk->sk_state_change = lowcomms_state_change;
con->sock->sk->sk_user_data = con;
return 0;
}
/* Add the port number to an IP6 or 4 sockaddr and return the address
/* Add the port number to an IPv6 or 4 sockaddr and return the address
length */
static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
int *addr_len)
{
saddr->ss_family = dlm_local_addr.ss_family;
saddr->ss_family = dlm_local_addr[0]->ss_family;
if (saddr->ss_family == AF_INET) {
struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
in4_addr->sin_port = cpu_to_be16(port);
*addr_len = sizeof(struct sockaddr_in);
memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
} else {
struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
in6_addr->sin6_port = cpu_to_be16(port);
......@@ -264,6 +336,192 @@ static void close_connection(struct connection *con, bool and_other)
mutex_unlock(&con->sock_mutex);
}
/* We only send shutdown messages to nodes that are not part of the cluster */
static void sctp_send_shutdown(sctp_assoc_t associd)
{
static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
struct msghdr outmessage;
struct cmsghdr *cmsg;
struct sctp_sndrcvinfo *sinfo;
int ret;
struct connection *con;
con = nodeid2con(0,0);
BUG_ON(con == NULL);
outmessage.msg_name = NULL;
outmessage.msg_namelen = 0;
outmessage.msg_control = outcmsg;
outmessage.msg_controllen = sizeof(outcmsg);
outmessage.msg_flags = MSG_EOR;
cmsg = CMSG_FIRSTHDR(&outmessage);
cmsg->cmsg_level = IPPROTO_SCTP;
cmsg->cmsg_type = SCTP_SNDRCV;
cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
outmessage.msg_controllen = cmsg->cmsg_len;
sinfo = CMSG_DATA(cmsg);
memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
sinfo->sinfo_flags |= MSG_EOF;
sinfo->sinfo_assoc_id = associd;
ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0);
if (ret != 0)
log_print("send EOF to node failed: %d", ret);
}
/* INIT failed but we don't know which node...
restart INIT on all pending nodes */
static void sctp_init_failed(void)
{
int i;
struct connection *con;
down(&connections_lock);
for (i=1; i<=max_nodeid; i++) {
con = __nodeid2con(i, 0);
if (!con)
continue;
con->sctp_assoc = 0;
if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
queue_work(send_workqueue, &con->swork);
}
}
}
up(&connections_lock);
}
/* Something happened to an association */
static void process_sctp_notification(struct connection *con, struct msghdr *msg, char *buf)
{
union sctp_notification *sn = (union sctp_notification *)buf;
if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) {
switch (sn->sn_assoc_change.sac_state) {
case SCTP_COMM_UP:
case SCTP_RESTART:
{
/* Check that the new node is in the lockspace */
struct sctp_prim prim;
int nodeid;
int prim_len, ret;
int addr_len;
struct connection *new_con;
struct file *file;
sctp_peeloff_arg_t parg;
int parglen = sizeof(parg);
/*
* We get this before any data for an association.
* We verify that the node is in the cluster and
* then peel off a socket for it.
*/
if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
log_print("COMM_UP for invalid assoc ID %d",
(int)sn->sn_assoc_change.sac_assoc_id);
sctp_init_failed();
return;
}
memset(&prim, 0, sizeof(struct sctp_prim));
prim_len = sizeof(struct sctp_prim);
prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
ret = kernel_getsockopt(con->sock,
IPPROTO_SCTP,
SCTP_PRIMARY_ADDR,
(char*)&prim,
&prim_len);
if (ret < 0) {
log_print("getsockopt/sctp_primary_addr on "
"new assoc %d failed : %d",
(int)sn->sn_assoc_change.sac_assoc_id,
ret);
/* Retry INIT later */
new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
if (new_con)
clear_bit(CF_CONNECT_PENDING, &con->flags);
return;
}
make_sockaddr(&prim.ssp_addr, 0, &addr_len);
if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
int i;
unsigned char *b=(unsigned char *)&prim.ssp_addr;
log_print("reject connect from unknown addr");
for (i=0; i<sizeof(struct sockaddr_storage);i++)
printk("%02x ", b[i]);
printk("\n");
sctp_send_shutdown(prim.ssp_assoc_id);
return;
}
new_con = nodeid2con(nodeid, GFP_KERNEL);
if (!new_con)
return;
/* Peel off a new sock */
parg.associd = sn->sn_assoc_change.sac_assoc_id;
ret = kernel_getsockopt(con->sock, IPPROTO_SCTP, SCTP_SOCKOPT_PEELOFF,
(void *)&parg, &parglen);
if (ret < 0) {
log_print("Can't peel off a socket for connection %d to node %d: err=%d\n",
parg.associd, nodeid, ret);
return;
}
file = fget(parg.sd);
new_con->sock = SOCKET_I(file->f_dentry->d_inode);
add_sock(new_con->sock, new_con);
fput(file);
put_unused_fd(parg.sd);
log_print("got new/restarted association %d nodeid %d",
(int)sn->sn_assoc_change.sac_assoc_id, nodeid);
/* Send any pending writes */
clear_bit(CF_CONNECT_PENDING, &new_con->flags);
clear_bit(CF_INIT_PENDING, &con->flags);
if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) {
queue_work(send_workqueue, &new_con->swork);
}
if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags))
queue_work(recv_workqueue, &new_con->rwork);
}
break;
case SCTP_COMM_LOST:
case SCTP_SHUTDOWN_COMP:
{
con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
if (con) {
con->sctp_assoc = 0;
}
}
break;
/* We don't know which INIT failed, so clear the PENDING flags
* on them all. if assoc_id is zero then it will then try
* again */
case SCTP_CANT_STR_ASSOC:
{
log_print("Can't start SCTP association - retrying");
sctp_init_failed();
}
break;
default:
log_print("unexpected SCTP assoc change id=%d state=%d",
(int)sn->sn_assoc_change.sac_assoc_id,
sn->sn_assoc_change.sac_state);
}
}
}
/* Data received from remote end */
static int receive_from_sock(struct connection *con)
{
......@@ -274,6 +532,7 @@ static int receive_from_sock(struct connection *con)
int r;
int call_again_soon = 0;
int nvec;
char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
mutex_lock(&con->sock_mutex);
......@@ -293,6 +552,11 @@ static int receive_from_sock(struct connection *con)
cbuf_init(&con->cb, PAGE_CACHE_SIZE);
}
/* Only SCTP needs these really */
memset(&incmsg, 0, sizeof(incmsg));
msg.msg_control = incmsg;
msg.msg_controllen = sizeof(incmsg);
/*
* iov[0] is the bit of the circular buffer between the current end
* point (cb.base + cb.len) and the end of the buffer.
......@@ -316,10 +580,22 @@ static int receive_from_sock(struct connection *con)
r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len,
MSG_DONTWAIT | MSG_NOSIGNAL);
if (ret <= 0)
goto out_close;
/* Process SCTP notifications */
if (msg.msg_flags & MSG_NOTIFICATION) {
BUG_ON(con->nodeid != 0);
msg.msg_control = incmsg;
msg.msg_controllen = sizeof(incmsg);
process_sctp_notification(con, &msg,
page_address(con->rx_page) + con->cb.base);
mutex_unlock(&con->sock_mutex);
return 0;
}
BUG_ON(con->nodeid == 0);
if (ret == len)
call_again_soon = 1;
cbuf_add(&con->cb, ret);
......@@ -367,7 +643,7 @@ static int receive_from_sock(struct connection *con)
}
/* Listening socket is busy, accept a connection */
static int accept_from_sock(struct connection *con)
static int tcp_accept_from_sock(struct connection *con)
{
int result;
struct sockaddr_storage peeraddr;
......@@ -378,7 +654,7 @@ static int accept_from_sock(struct connection *con)
struct connection *addcon;
memset(&peeraddr, 0, sizeof(peeraddr));
result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
IPPROTO_TCP, &newsock);
if (result < 0)
return -ENOMEM;
......@@ -418,7 +694,6 @@ static int accept_from_sock(struct connection *con)
/* Check to see if we already have a connection to this node. This
* could happen if the two nodes initiate a connection at roughly
* the same time and the connections cross on the wire.
* TEMPORARY FIX:
* In this case we store the incoming one in "othercon"
*/
newcon = nodeid2con(nodeid, GFP_KERNEL);
......@@ -480,8 +755,102 @@ static int accept_from_sock(struct connection *con)
return result;
}
static void free_entry(struct writequeue_entry *e)
{
__free_page(e->page);
kfree(e);
}
/* Initiate an SCTP association.
This is a special case of send_to_sock() in that we don't yet have a
peeled-off socket for this association, so we use the listening socket
and add the primary IP address of the remote node.
*/
static void sctp_init_assoc(struct connection *con)
{
struct sockaddr_storage rem_addr;
char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
struct msghdr outmessage;
struct cmsghdr *cmsg;
struct sctp_sndrcvinfo *sinfo;
struct connection *base_con;
struct writequeue_entry *e;
int len, offset;
int ret;
int addrlen;
struct kvec iov[1];
if (test_and_set_bit(CF_INIT_PENDING, &con->flags))
return;
if (con->retries++ > MAX_CONNECT_RETRIES)
return;
log_print("Initiating association with node %d", con->nodeid);
if (nodeid_to_addr(con->nodeid, (struct sockaddr *)&rem_addr)) {
log_print("no address for nodeid %d", con->nodeid);
return;
}
base_con = nodeid2con(0, 0);
BUG_ON(base_con == NULL);
make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen);
outmessage.msg_name = &rem_addr;
outmessage.msg_namelen = addrlen;
outmessage.msg_control = outcmsg;
outmessage.msg_controllen = sizeof(outcmsg);
outmessage.msg_flags = MSG_EOR;
spin_lock(&con->writequeue_lock);
e = list_entry(con->writequeue.next, struct writequeue_entry,
list);
BUG_ON((struct list_head *) e == &con->writequeue);
len = e->len;
offset = e->offset;
spin_unlock(&con->writequeue_lock);
kmap(e->page);
/* Send the first block off the write queue */
iov[0].iov_base = page_address(e->page)+offset;
iov[0].iov_len = len;
cmsg = CMSG_FIRSTHDR(&outmessage);
cmsg->cmsg_level = IPPROTO_SCTP;
cmsg->cmsg_type = SCTP_SNDRCV;
cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
sinfo = CMSG_DATA(cmsg);
memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
sinfo->sinfo_ppid = cpu_to_le32(dlm_our_nodeid());
outmessage.msg_controllen = cmsg->cmsg_len;
ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len);
if (ret < 0) {
log_print("Send first packet to node %d failed: %d", con->nodeid, ret);
/* Try again later */
clear_bit(CF_CONNECT_PENDING, &con->flags);
clear_bit(CF_INIT_PENDING, &con->flags);
}
else {
spin_lock(&con->writequeue_lock);
e->offset += ret;
e->len -= ret;
if (e->len == 0 && e->users == 0) {
list_del(&e->list);
kunmap(e->page);
free_entry(e);
}
spin_unlock(&con->writequeue_lock);
}
}
/* Connect a new socket to its peer */
static void connect_to_sock(struct connection *con)
static void tcp_connect_to_sock(struct connection *con)
{
int result = -EHOSTUNREACH;
struct sockaddr_storage saddr;
......@@ -504,7 +873,7 @@ static void connect_to_sock(struct connection *con)
}
/* Create a socket to communicate with */
result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
IPPROTO_TCP, &sock);
if (result < 0)
goto out_err;
......@@ -515,11 +884,11 @@ static void connect_to_sock(struct connection *con)
sock->sk->sk_user_data = con;
con->rx_action = receive_from_sock;
con->connect_action = tcp_connect_to_sock;
add_sock(sock, con);
make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
add_sock(sock, con);
log_print("connecting to %d", con->nodeid);
result =
sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
......@@ -549,38 +918,36 @@ static void connect_to_sock(struct connection *con)
return;
}
static struct socket *create_listen_sock(struct connection *con,
static struct socket *tcp_create_listen_sock(struct connection *con,
struct sockaddr_storage *saddr)
{
struct socket *sock = NULL;
mm_segment_t fs;
int result = 0;
int one = 1;
int addr_len;
if (dlm_local_addr.ss_family == AF_INET)
if (dlm_local_addr[0]->ss_family == AF_INET)
addr_len = sizeof(struct sockaddr_in);
else
addr_len = sizeof(struct sockaddr_in6);
/* Create a socket to communicate with */
result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock);
result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, IPPROTO_TCP, &sock);
if (result < 0) {
printk("dlm: Can't create listening comms socket\n");
goto create_out;
}
fs = get_fs();
set_fs(get_ds());
result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
(char *)&one, sizeof(one));
set_fs(fs);
if (result < 0) {
printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n",
result);
}
sock->sk->sk_user_data = con;
con->rx_action = accept_from_sock;
con->rx_action = tcp_accept_from_sock;
con->connect_action = tcp_connect_to_sock;
con->sock = sock;
/* Bind to our port */
......@@ -593,13 +960,8 @@ static struct socket *create_listen_sock(struct connection *con,
con->sock = NULL;
goto create_out;
}
fs = get_fs();
set_fs(get_ds());
result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
(char *)&one, sizeof(one));
set_fs(fs);
if (result < 0) {
printk("dlm: Set keepalive failed: %d\n", result);
}
......@@ -616,18 +978,141 @@ static struct socket *create_listen_sock(struct connection *con,
return sock;
}
/* Get local addresses */
static void init_local(void)
{
struct sockaddr_storage sas, *addr;
int i;
for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) {
if (dlm_our_addr(&sas, i))
break;
addr = kmalloc(sizeof(*addr), GFP_KERNEL);
if (!addr)
break;
memcpy(addr, &sas, sizeof(*addr));
dlm_local_addr[dlm_local_count++] = addr;
}
}
/* Bind to an IP address. SCTP allows multiple address so it can do multi-homing */
static int add_sctp_bind_addr(struct connection *sctp_con, struct sockaddr_storage *addr, int addr_len, int num)
{
int result = 0;
if (num == 1)
result = kernel_bind(sctp_con->sock,
(struct sockaddr *) addr,
addr_len);
else
result = kernel_setsockopt(sctp_con->sock, SOL_SCTP,
SCTP_SOCKOPT_BINDX_ADD,
(char *)addr, addr_len);
if (result < 0)
log_print("Can't bind to port %d addr number %d",
dlm_config.ci_tcp_port, num);
return result;
}
/* Initialise SCTP socket and bind to all interfaces */
static int sctp_listen_for_all(void)
{
struct socket *sock = NULL;
struct sockaddr_storage localaddr;
struct sctp_event_subscribe subscribe;
int result = -EINVAL, num = 1, i, addr_len;
struct connection *con = nodeid2con(0, GFP_KERNEL);
int bufsize = NEEDED_RMEM;
if (!con)
return -ENOMEM;
log_print("Using SCTP for communications");
result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
IPPROTO_SCTP, &sock);
if (result < 0) {
log_print("Can't create comms socket, check SCTP is loaded");
goto out;
}
/* Listen for events */
memset(&subscribe, 0, sizeof(subscribe));
subscribe.sctp_data_io_event = 1;
subscribe.sctp_association_event = 1;
subscribe.sctp_send_failure_event = 1;
subscribe.sctp_shutdown_event = 1;
subscribe.sctp_partial_delivery_event = 1;
result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUF,
(char *)&bufsize, sizeof(bufsize));
if (result)
log_print("Error increasing buffer space on socket: %d", result);
result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
(char *)&subscribe, sizeof(subscribe));
if (result < 0) {
log_print("Failed to set SCTP_EVENTS on socket: result=%d",
result);
goto create_delsock;
}
/* Init con struct */
sock->sk->sk_user_data = con;
con->sock = sock;
con->sock->sk->sk_data_ready = lowcomms_data_ready;
con->rx_action = receive_from_sock;
con->connect_action = sctp_init_assoc;
/* Bind to all interfaces. */
for (i = 0; i < dlm_local_count; i++) {
memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
result = add_sctp_bind_addr(con, &localaddr, addr_len, num);
if (result)
goto create_delsock;
++num;
}
result = sock->ops->listen(sock, 5);
if (result < 0) {
log_print("Can't set socket listening");
goto create_delsock;
}
return 0;
create_delsock:
sock_release(sock);
con->sock = NULL;
out:
return result;
}
/* Listen on all interfaces */
static int listen_for_all(void)
static int tcp_listen_for_all(void)
{
struct socket *sock = NULL;
struct connection *con = nodeid2con(0, GFP_KERNEL);
int result = -EINVAL;
if (!con)
return -ENOMEM;
/* We don't support multi-homed hosts */
if (dlm_local_addr[1] != NULL) {
log_print("TCP protocol can't handle multi-homed hosts, try SCTP");
return -EINVAL;
}
log_print("Using TCP for communications");
set_bit(CF_IS_OTHERCON, &con->flags);
sock = create_listen_sock(con, &dlm_local_addr);
sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
if (sock) {
add_sock(sock, con);
result = 0;
......@@ -734,12 +1219,6 @@ void dlm_lowcomms_commit_buffer(void *mh)
return;
}
static void free_entry(struct writequeue_entry *e)
{
__free_page(e->page);
kfree(e);
}
/* Send a message */
static void send_to_sock(struct connection *con)
{
......@@ -806,7 +1285,8 @@ static void send_to_sock(struct connection *con)
out_connect:
mutex_unlock(&con->sock_mutex);
connect_to_sock(con);
if (!test_bit(CF_INIT_PENDING, &con->flags))
lowcomms_connect_sock(con);
return;
}
......@@ -831,9 +1311,6 @@ int dlm_lowcomms_close(int nodeid)
{
struct connection *con;
if (!connections)
goto out;
log_print("closing connection to node %d", nodeid);
con = nodeid2con(nodeid, 0);
if (con) {
......@@ -841,12 +1318,9 @@ int dlm_lowcomms_close(int nodeid)
close_connection(con, true);
}
return 0;
out:
return -1;
}
/* Look for activity on active sockets */
/* Receive workqueue function */
static void process_recv_sockets(struct work_struct *work)
{
struct connection *con = container_of(work, struct connection, rwork);
......@@ -858,15 +1332,14 @@ static void process_recv_sockets(struct work_struct *work)
} while (!err);
}
/* Send workqueue function */
static void process_send_sockets(struct work_struct *work)
{
struct connection *con = container_of(work, struct connection, swork);
if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
connect_to_sock(con);
con->connect_action(con);
}
clear_bit(CF_WRITE_PENDING, &con->flags);
send_to_sock(con);
}
......@@ -877,7 +1350,7 @@ static void clean_writequeues(void)
{
int nodeid;
for (nodeid = 1; nodeid < conn_array_size; nodeid++) {
for (nodeid = 1; nodeid < max_nodeid; nodeid++) {
struct connection *con = nodeid2con(nodeid, 0);
if (con)
......@@ -915,64 +1388,64 @@ static int work_start(void)
void dlm_lowcomms_stop(void)
{
int i;
struct connection *con;
/* Set all the flags to prevent any
socket activity.
*/
for (i = 0; i < conn_array_size; i++) {
if (connections[i])
connections[i]->flags |= 0xFF;
down(&connections_lock);
for (i = 0; i < max_nodeid; i++) {
con = __nodeid2con(i, 0);
if (con)
con->flags |= 0xFF;
}
up(&connections_lock);
work_stop();
down(&connections_lock);
clean_writequeues();
for (i = 0; i < conn_array_size; i++) {
if (connections[i]) {
close_connection(connections[i], true);
if (connections[i]->othercon)
kmem_cache_free(con_cache, connections[i]->othercon);
kmem_cache_free(con_cache, connections[i]);
for (i = 0; i < max_nodeid; i++) {
con = nodeid2con(i, 0);
if (con) {
close_connection(con, true);
if (con->othercon)
kmem_cache_free(con_cache, con->othercon);
kmem_cache_free(con_cache, con);
}
}
kfree(connections);
connections = NULL;
up(&connections_lock);
kmem_cache_destroy(con_cache);
}
/* This is quite likely to sleep... */
int dlm_lowcomms_start(void)
{
int error = 0;
error = -ENOMEM;
connections = kzalloc(sizeof(struct connection *) *
NODE_INCREMENT, GFP_KERNEL);
if (!connections)
goto out;
conn_array_size = NODE_INCREMENT;
int error = -EINVAL;
struct connection *con;
if (dlm_our_addr(&dlm_local_addr, 0)) {
init_local();
if (!dlm_local_count) {
log_print("no local IP address has been set");
goto fail_free_conn;
}
if (!dlm_our_addr(&dlm_local_addr, 1)) {
log_print("This dlm comms module does not support multi-homed clustering");
goto fail_free_conn;
goto out;
}
error = -ENOMEM;
con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
__alignof__(struct connection), 0,
NULL, NULL);
if (!con_cache)
goto fail_free_conn;
goto out;
/* Set some sysctl minima */
if (sysctl_rmem_max < NEEDED_RMEM)
sysctl_rmem_max = NEEDED_RMEM;
/* Start listening */
error = listen_for_all();
if (dlm_config.ci_protocol == 0)
error = tcp_listen_for_all();
else
error = sctp_listen_for_all();
if (error)
goto fail_unlisten;
......@@ -983,24 +1456,13 @@ int dlm_lowcomms_start(void)
return 0;
fail_unlisten:
close_connection(connections[0], false);
kmem_cache_free(con_cache, connections[0]);
con = nodeid2con(0,0);
if (con) {
close_connection(con, false);
kmem_cache_free(con_cache, con);
}
kmem_cache_destroy(con_cache);
fail_free_conn:
kfree(connections);
out:
return error;
}
/*
* Overrides for Emacs so that we follow Linus's tabbing style.
* Emacs will notice this stuff at the end of the file and automatically
* adjust the settings for this buffer only. This must remain at the end
* of the file.
* ---------------------------------------------------------------------------
* Local variables:
* c-file-style: "linux"
* End:
*/
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment