Commit f0747ebf authored by Alexander Aring's avatar Alexander Aring Committed by David Teigland

fs: dlm: simplify writequeue handling

This patch cleans up the current dlm sending allocator handling by using
some named macros, list functionality and removes some goto statements.
Signed-off-by: default avatarAlexander Aring <aahringo@redhat.com>
Signed-off-by: default avatarDavid Teigland <teigland@redhat.com>
parent e1a7cbce
...@@ -102,6 +102,9 @@ struct listen_connection { ...@@ -102,6 +102,9 @@ struct listen_connection {
struct work_struct rwork; struct work_struct rwork;
}; };
#define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end)
#define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset)
/* An entry waiting to be sent */ /* An entry waiting to be sent */
struct writequeue_entry { struct writequeue_entry {
struct list_head list; struct list_head list;
...@@ -1351,7 +1354,7 @@ static struct writequeue_entry *new_writequeue_entry(struct connection *con, ...@@ -1351,7 +1354,7 @@ static struct writequeue_entry *new_writequeue_entry(struct connection *con,
{ {
struct writequeue_entry *entry; struct writequeue_entry *entry;
entry = kmalloc(sizeof(struct writequeue_entry), allocation); entry = kzalloc(sizeof(*entry), allocation);
if (!entry) if (!entry)
return NULL; return NULL;
...@@ -1361,62 +1364,62 @@ static struct writequeue_entry *new_writequeue_entry(struct connection *con, ...@@ -1361,62 +1364,62 @@ static struct writequeue_entry *new_writequeue_entry(struct connection *con,
return NULL; return NULL;
} }
entry->offset = 0;
entry->len = 0;
entry->end = 0;
entry->users = 0;
entry->con = con; entry->con = con;
entry->users = 1;
return entry; return entry;
} }
void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
gfp_t allocation, char **ppc)
{ {
struct connection *con;
struct writequeue_entry *e; struct writequeue_entry *e;
int offset = 0;
if (len > DEFAULT_BUFFER_SIZE ||
len < sizeof(struct dlm_header)) {
BUILD_BUG_ON(PAGE_SIZE < DEFAULT_BUFFER_SIZE);
log_print("failed to allocate a buffer of size %d", len);
WARN_ON(1);
return NULL;
}
con = nodeid2con(nodeid, allocation);
if (!con)
return NULL;
spin_lock(&con->writequeue_lock); spin_lock(&con->writequeue_lock);
e = list_entry(con->writequeue.prev, struct writequeue_entry, list); if (!list_empty(&con->writequeue)) {
if ((&e->list == &con->writequeue) || e = list_last_entry(&con->writequeue, struct writequeue_entry, list);
(PAGE_SIZE - e->end < len)) { if (DLM_WQ_REMAIN_BYTES(e) >= len) {
e = NULL; *ppc = page_address(e->page) + e->end;
} else {
offset = e->end;
e->end += len; e->end += len;
e->users++; e->users++;
}
spin_unlock(&con->writequeue_lock); spin_unlock(&con->writequeue_lock);
if (e) {
got_one:
*ppc = page_address(e->page) + offset;
return e; return e;
} }
}
spin_unlock(&con->writequeue_lock);
e = new_writequeue_entry(con, allocation); e = new_writequeue_entry(con, allocation);
if (e) { if (!e)
spin_lock(&con->writequeue_lock); return NULL;
offset = e->end;
*ppc = page_address(e->page);
e->end += len; e->end += len;
e->users++;
spin_lock(&con->writequeue_lock);
list_add_tail(&e->list, &con->writequeue); list_add_tail(&e->list, &con->writequeue);
spin_unlock(&con->writequeue_lock); spin_unlock(&con->writequeue_lock);
goto got_one;
return e;
};
void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
{
struct connection *con;
if (len > DEFAULT_BUFFER_SIZE ||
len < sizeof(struct dlm_header)) {
BUILD_BUG_ON(PAGE_SIZE < DEFAULT_BUFFER_SIZE);
log_print("failed to allocate a buffer of size %d", len);
WARN_ON(1);
return NULL;
} }
con = nodeid2con(nodeid, allocation);
if (!con)
return NULL; return NULL;
return new_wq_entry(con, len, allocation, ppc);
} }
void dlm_lowcomms_commit_buffer(void *mh) void dlm_lowcomms_commit_buffer(void *mh)
...@@ -1429,7 +1432,8 @@ void dlm_lowcomms_commit_buffer(void *mh) ...@@ -1429,7 +1432,8 @@ void dlm_lowcomms_commit_buffer(void *mh)
users = --e->users; users = --e->users;
if (users) if (users)
goto out; goto out;
e->len = e->end - e->offset;
e->len = DLM_WQ_LENGTH_BYTES(e);
spin_unlock(&con->writequeue_lock); spin_unlock(&con->writequeue_lock);
queue_work(send_workqueue, &con->swork); queue_work(send_workqueue, &con->swork);
...@@ -1455,11 +1459,10 @@ static void send_to_sock(struct connection *con) ...@@ -1455,11 +1459,10 @@ static void send_to_sock(struct connection *con)
spin_lock(&con->writequeue_lock); spin_lock(&con->writequeue_lock);
for (;;) { for (;;) {
e = list_entry(con->writequeue.next, struct writequeue_entry, if (list_empty(&con->writequeue))
list);
if ((struct list_head *) e == &con->writequeue)
break; break;
e = list_first_entry(&con->writequeue, struct writequeue_entry, list);
len = e->len; len = e->len;
offset = e->offset; offset = e->offset;
BUG_ON(len == 0 && e->users == 0); BUG_ON(len == 0 && e->users == 0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment