Commit 4798cbbf authored by Alexander Aring's avatar Alexander Aring Committed by David Teigland

fs: dlm: rework receive handling

This patch reworks the current receive handling of dlm. As I tried to
change the send handling to fix reorder issues I took a look into the
receive handling and simplified it, it works as the following:

Each connection has a preallocated receive buffer with a minimum length of
4096. On receive, the upper layer protocol will process all dlm message
until there is not enough data anymore. If there exists "leftover" data at
the end of the receive buffer because the dlm message wasn't fully received
it will be copied to the begin of the preallocated receive buffer. Next
receive more data will be appended to the previous "leftover" data and
processing will begin again.

This will remove a lot of code of the current mechanism. Inside the
processing functionality we will ensure with a memmove() that the dlm
message should be memory aligned. To have a dlm message always started
at the beginning of the buffer will reduce some amount of memmove()
calls because src and dest pointers are the same.

The cluster attribute "buffer_size" becomes a new meaning, it's now the
size of application layer receive buffer size. If this is changed during
runtime the receive buffer will be reallocated. It's important that the
receive buffer size has at minimum the size of the maximum possible dlm
message size otherwise the received message cannot be placed inside
the receive buffer size.
Signed-off-by: default avatarAlexander Aring <aahringo@redhat.com>
Signed-off-by: default avatarDavid Teigland <teigland@redhat.com>
parent 4e192ee6
...@@ -166,7 +166,6 @@ static bool dlm_check_zero(unsigned int x) ...@@ -166,7 +166,6 @@ static bool dlm_check_zero(unsigned int x)
return !x; return !x;
} }
#define DEFAULT_BUFFER_SIZE 4096
static bool dlm_check_buffer_size(unsigned int x) static bool dlm_check_buffer_size(unsigned int x)
{ {
return (x < DEFAULT_BUFFER_SIZE); return (x < DEFAULT_BUFFER_SIZE);
......
...@@ -12,6 +12,8 @@ ...@@ -12,6 +12,8 @@
#ifndef __CONFIG_DOT_H__ #ifndef __CONFIG_DOT_H__
#define __CONFIG_DOT_H__ #define __CONFIG_DOT_H__
#define DEFAULT_BUFFER_SIZE 4096
struct dlm_config_node { struct dlm_config_node {
int nodeid; int nodeid;
int weight; int weight;
......
...@@ -65,40 +65,6 @@ ...@@ -65,40 +65,6 @@
#define MAX_SEND_MSG_COUNT 25 #define MAX_SEND_MSG_COUNT 25
#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000) #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000)
struct cbuf {
unsigned int base;
unsigned int len;
unsigned int mask;
};
static void cbuf_add(struct cbuf *cb, int n)
{
cb->len += n;
}
static int cbuf_data(struct cbuf *cb)
{
return ((cb->base + cb->len) & cb->mask);
}
static void cbuf_init(struct cbuf *cb, int size)
{
cb->base = cb->len = 0;
cb->mask = size-1;
}
static void cbuf_eat(struct cbuf *cb, int n)
{
cb->len -= n;
cb->base += n;
cb->base &= cb->mask;
}
static bool cbuf_empty(struct cbuf *cb)
{
return cb->len == 0;
}
struct connection { struct connection {
struct socket *sock; /* NULL if not connected */ struct socket *sock; /* NULL if not connected */
uint32_t nodeid; /* So we know who we are in the list */ uint32_t nodeid; /* So we know who we are in the list */
...@@ -117,8 +83,6 @@ struct connection { ...@@ -117,8 +83,6 @@ struct connection {
int (*rx_action) (struct connection *); /* What to do when active */ int (*rx_action) (struct connection *); /* What to do when active */
void (*connect_action) (struct connection *); /* What to do to connect */ void (*connect_action) (struct connection *); /* What to do to connect */
void (*shutdown_action)(struct connection *con); /* What to do to shutdown */ void (*shutdown_action)(struct connection *con); /* What to do to shutdown */
struct page *rx_page;
struct cbuf cb;
int retries; int retries;
#define MAX_CONNECT_RETRIES 3 #define MAX_CONNECT_RETRIES 3
struct hlist_node list; struct hlist_node list;
...@@ -126,6 +90,9 @@ struct connection { ...@@ -126,6 +90,9 @@ struct connection {
struct work_struct rwork; /* Receive workqueue */ struct work_struct rwork; /* Receive workqueue */
struct work_struct swork; /* Send workqueue */ struct work_struct swork; /* Send workqueue */
wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */ wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
unsigned char *rx_buf;
int rx_buflen;
int rx_leftover;
struct rcu_head rcu; struct rcu_head rcu;
}; };
#define sock2con(x) ((struct connection *)(x)->sk_user_data) #define sock2con(x) ((struct connection *)(x)->sk_user_data)
...@@ -219,6 +186,13 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc) ...@@ -219,6 +186,13 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
if (!con) if (!con)
return NULL; return NULL;
con->rx_buflen = dlm_config.ci_buffer_size;
con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS);
if (!con->rx_buf) {
kfree(con);
return NULL;
}
con->nodeid = nodeid; con->nodeid = nodeid;
mutex_init(&con->sock_mutex); mutex_init(&con->sock_mutex);
INIT_LIST_HEAD(&con->writequeue); INIT_LIST_HEAD(&con->writequeue);
...@@ -613,11 +587,8 @@ static void close_connection(struct connection *con, bool and_other, ...@@ -613,11 +587,8 @@ static void close_connection(struct connection *con, bool and_other,
/* Will only re-enter once. */ /* Will only re-enter once. */
close_connection(con->othercon, false, true, true); close_connection(con->othercon, false, true, true);
} }
if (con->rx_page) {
__free_page(con->rx_page);
con->rx_page = NULL;
}
con->rx_leftover = 0;
con->retries = 0; con->retries = 0;
mutex_unlock(&con->sock_mutex); mutex_unlock(&con->sock_mutex);
clear_bit(CF_CLOSING, &con->flags); clear_bit(CF_CLOSING, &con->flags);
...@@ -671,16 +642,33 @@ static void dlm_tcp_shutdown(struct connection *con) ...@@ -671,16 +642,33 @@ static void dlm_tcp_shutdown(struct connection *con)
shutdown_connection(con); shutdown_connection(con);
} }
static int con_realloc_receive_buf(struct connection *con, int newlen)
{
unsigned char *newbuf;
newbuf = kmalloc(newlen, GFP_NOFS);
if (!newbuf)
return -ENOMEM;
/* copy any leftover from last receive */
if (con->rx_leftover)
memmove(newbuf, con->rx_buf, con->rx_leftover);
/* swap to new buffer space */
kfree(con->rx_buf);
con->rx_buflen = newlen;
con->rx_buf = newbuf;
return 0;
}
/* Data received from remote end */ /* Data received from remote end */
static int receive_from_sock(struct connection *con) static int receive_from_sock(struct connection *con)
{ {
int ret = 0;
struct msghdr msg = {};
struct kvec iov[2];
unsigned len;
int r;
int call_again_soon = 0; int call_again_soon = 0;
int nvec; struct msghdr msg;
struct kvec iov;
int ret, buflen;
mutex_lock(&con->sock_mutex); mutex_lock(&con->sock_mutex);
...@@ -688,71 +676,55 @@ static int receive_from_sock(struct connection *con) ...@@ -688,71 +676,55 @@ static int receive_from_sock(struct connection *con)
ret = -EAGAIN; ret = -EAGAIN;
goto out_close; goto out_close;
} }
if (con->nodeid == 0) { if (con->nodeid == 0) {
ret = -EINVAL; ret = -EINVAL;
goto out_close; goto out_close;
} }
if (con->rx_page == NULL) { /* realloc if we get new buffer size to read out */
/* buflen = dlm_config.ci_buffer_size;
* This doesn't need to be atomic, but I think it should if (con->rx_buflen != buflen && con->rx_leftover <= buflen) {
* improve performance if it is. ret = con_realloc_receive_buf(con, buflen);
*/ if (ret < 0)
con->rx_page = alloc_page(GFP_ATOMIC);
if (con->rx_page == NULL)
goto out_resched; goto out_resched;
cbuf_init(&con->cb, PAGE_SIZE);
} }
/* /* calculate new buffer parameter regarding last receive and
* iov[0] is the bit of the circular buffer between the current end * possible leftover bytes
* point (cb.base + cb.len) and the end of the buffer.
*/ */
iov[0].iov_len = con->cb.base - cbuf_data(&con->cb); iov.iov_base = con->rx_buf + con->rx_leftover;
iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb); iov.iov_len = con->rx_buflen - con->rx_leftover;
iov[1].iov_len = 0;
nvec = 1;
/* memset(&msg, 0, sizeof(msg));
* iov[1] is the bit of the circular buffer between the start of the msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
* buffer and the start of the currently used section (cb.base) ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
*/ msg.msg_flags);
if (cbuf_data(&con->cb) >= con->cb.base) {
iov[0].iov_len = PAGE_SIZE - cbuf_data(&con->cb);
iov[1].iov_len = con->cb.base;
iov[1].iov_base = page_address(con->rx_page);
nvec = 2;
}
len = iov[0].iov_len + iov[1].iov_len;
iov_iter_kvec(&msg.msg_iter, READ, iov, nvec, len);
r = ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT | MSG_NOSIGNAL);
if (ret <= 0) if (ret <= 0)
goto out_close; goto out_close;
else if (ret == len) else if (ret == iov.iov_len)
call_again_soon = 1; call_again_soon = 1;
cbuf_add(&con->cb, ret); /* new buflen according readed bytes and leftover from last receive */
ret = dlm_process_incoming_buffer(con->nodeid, buflen = ret + con->rx_leftover;
page_address(con->rx_page), ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen);
con->cb.base, con->cb.len, if (ret < 0)
PAGE_SIZE); goto out_close;
if (ret < 0) {
log_print("lowcomms err %d: addr=%p, base=%u, len=%u, read=%d",
ret, page_address(con->rx_page), con->cb.base,
con->cb.len, r);
cbuf_eat(&con->cb, r);
} else {
cbuf_eat(&con->cb, ret);
}
if (cbuf_empty(&con->cb) && !call_again_soon) { /* calculate leftover bytes from process and put it into begin of
__free_page(con->rx_page); * the receive buffer, so next receive we have the full message
con->rx_page = NULL; * at the start address of the receive buffer.
*/
con->rx_leftover = buflen - ret;
if (con->rx_leftover) {
memmove(con->rx_buf, con->rx_buf + ret,
con->rx_leftover);
call_again_soon = true;
} }
if (call_again_soon) if (call_again_soon)
goto out_resched; goto out_resched;
mutex_unlock(&con->sock_mutex); mutex_unlock(&con->sock_mutex);
return 0; return 0;
...@@ -854,6 +826,17 @@ static int accept_from_sock(struct connection *con) ...@@ -854,6 +826,17 @@ static int accept_from_sock(struct connection *con)
result = -ENOMEM; result = -ENOMEM;
goto accept_err; goto accept_err;
} }
othercon->rx_buflen = dlm_config.ci_buffer_size;
othercon->rx_buf = kmalloc(othercon->rx_buflen, GFP_NOFS);
if (!othercon->rx_buf) {
mutex_unlock(&newcon->sock_mutex);
kfree(othercon);
log_print("failed to allocate incoming socket receive buffer");
result = -ENOMEM;
goto accept_err;
}
othercon->nodeid = nodeid; othercon->nodeid = nodeid;
othercon->rx_action = receive_from_sock; othercon->rx_action = receive_from_sock;
mutex_init(&othercon->sock_mutex); mutex_init(&othercon->sock_mutex);
...@@ -1603,6 +1586,14 @@ static void shutdown_conn(struct connection *con) ...@@ -1603,6 +1586,14 @@ static void shutdown_conn(struct connection *con)
con->shutdown_action(con); con->shutdown_action(con);
} }
static void connection_release(struct rcu_head *rcu)
{
struct connection *con = container_of(rcu, struct connection, rcu);
kfree(con->rx_buf);
kfree(con);
}
static void free_conn(struct connection *con) static void free_conn(struct connection *con)
{ {
close_connection(con, true, true, true); close_connection(con, true, true, true);
...@@ -1611,10 +1602,10 @@ static void free_conn(struct connection *con) ...@@ -1611,10 +1602,10 @@ static void free_conn(struct connection *con)
spin_unlock(&connections_lock); spin_unlock(&connections_lock);
if (con->othercon) { if (con->othercon) {
clean_one_writequeue(con->othercon); clean_one_writequeue(con->othercon);
kfree_rcu(con->othercon, rcu); call_rcu(&con->othercon->rcu, connection_release);
} }
clean_one_writequeue(con); clean_one_writequeue(con);
kfree_rcu(con, rcu); call_rcu(&con->rcu, connection_release);
} }
static void work_flush(void) static void work_flush(void)
......
...@@ -22,114 +22,84 @@ ...@@ -22,114 +22,84 @@
* into packets and sends them to the comms layer. * into packets and sends them to the comms layer.
*/ */
#include <asm/unaligned.h>
#include "dlm_internal.h" #include "dlm_internal.h"
#include "lowcomms.h" #include "lowcomms.h"
#include "config.h" #include "config.h"
#include "lock.h" #include "lock.h"
#include "midcomms.h" #include "midcomms.h"
static void copy_from_cb(void *dst, const void *base, unsigned offset,
unsigned len, unsigned limit)
{
unsigned copy = len;
if ((copy + offset) > limit)
copy = limit - offset;
memcpy(dst, base + offset, copy);
len -= copy;
if (len)
memcpy(dst + copy, base, len);
}
/* /*
* Called from the low-level comms layer to process a buffer of * Called from the low-level comms layer to process a buffer of
* commands. * commands.
*
* Only complete messages are processed here, any "spare" bytes from
* the end of a buffer are saved and tacked onto the front of the next
* message that comes in. I doubt this will happen very often but we
* need to be able to cope with it and I don't want the task to be waiting
* for packets to come in when there is useful work to be done.
*/ */
int dlm_process_incoming_buffer(int nodeid, const void *base, int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
unsigned offset, unsigned len, unsigned limit)
{ {
union { const unsigned char *ptr = buf;
unsigned char __buf[DLM_INBUF_LEN]; const struct dlm_header *hd;
/* this is to force proper alignment on some arches */
union dlm_packet p;
} __tmp;
union dlm_packet *p = &__tmp.p;
int ret = 0;
int err = 0;
uint16_t msglen; uint16_t msglen;
uint32_t lockspace; int ret = 0;
while (len > sizeof(struct dlm_header)) {
/* Copy just the header to check the total length. The
message may wrap around the end of the buffer back to the
start, so we need to use a temp buffer and copy_from_cb. */
copy_from_cb(p, base, offset, sizeof(struct dlm_header),
limit);
msglen = le16_to_cpu(p->header.h_length);
lockspace = p->header.h_lockspace;
err = -EINVAL; while (len >= sizeof(struct dlm_header)) {
if (msglen < sizeof(struct dlm_header)) hd = (struct dlm_header *)ptr;
break;
if (p->header.h_cmd == DLM_MSG) { /* no message should be more than this otherwise we
if (msglen < sizeof(struct dlm_message)) * cannot deliver this message to upper layers
break; */
} else { msglen = get_unaligned_le16(&hd->h_length);
if (msglen < sizeof(struct dlm_rcom)) if (msglen > DEFAULT_BUFFER_SIZE) {
break; log_print("received invalid length header: %u, will abort message parsing",
} msglen);
err = -E2BIG; return -EBADMSG;
if (msglen > dlm_config.ci_buffer_size) {
log_print("message size %d from %d too big, buf len %d",
msglen, nodeid, len);
break;
} }
err = 0;
/* If only part of the full message is contained in this
buffer, then do nothing and wait for lowcomms to call
us again later with more data. We return 0 meaning
we've consumed none of the input buffer. */
/* caller will take care that leftover
* will be parsed next call with more data
*/
if (msglen > len) if (msglen > len)
break; break;
/* Allocate a larger temp buffer if the full message won't fit switch (hd->h_cmd) {
in the buffer on the stack (which should work for most case DLM_MSG:
ordinary messages). */ if (msglen < sizeof(struct dlm_message)) {
log_print("dlm msg too small: %u, will skip this message",
if (msglen > sizeof(__tmp) && p == &__tmp.p) { msglen);
p = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS); goto skip;
if (p == NULL) }
return ret;
}
copy_from_cb(p, base, offset, msglen, limit); break;
case DLM_RCOM:
if (msglen < sizeof(struct dlm_rcom)) {
log_print("dlm rcom msg too small: %u, will skip this message",
msglen);
goto skip;
}
BUG_ON(lockspace != p->header.h_lockspace); break;
default:
log_print("unsupported h_cmd received: %u, will skip this message",
hd->h_cmd);
goto skip;
}
/* for aligned memory access, we just copy current message
* to begin of the buffer which contains already parsed buffer
* data and should provide align access for upper layers
* because the start address of the buffer has a aligned
* address. This memmove can be removed when the upperlayer
* is capable of unaligned memory access.
*/
memmove(buf, ptr, msglen);
dlm_receive_buffer((union dlm_packet *)buf, nodeid);
skip:
ret += msglen; ret += msglen;
offset += msglen;
offset &= (limit - 1);
len -= msglen; len -= msglen;
ptr += msglen;
dlm_receive_buffer(p, nodeid);
} }
if (p != &__tmp.p) return ret;
kfree(p);
return err ? err : ret;
} }
...@@ -12,8 +12,7 @@ ...@@ -12,8 +12,7 @@
#ifndef __MIDCOMMS_DOT_H__ #ifndef __MIDCOMMS_DOT_H__
#define __MIDCOMMS_DOT_H__ #define __MIDCOMMS_DOT_H__
int dlm_process_incoming_buffer(int nodeid, const void *base, unsigned offset, int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int buflen);
unsigned len, unsigned limit);
#endif /* __MIDCOMMS_DOT_H__ */ #endif /* __MIDCOMMS_DOT_H__ */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment