Commit 67347fe4 authored by Jason Baron's avatar Jason Baron Committed by Linus Torvalds

epoll: do not take global 'epmutex' for simple topologies

When calling EPOLL_CTL_ADD for an epoll file descriptor that is attached
directly to a wakeup source, we do not need to take the global 'epmutex',
unless the epoll file descriptor is nested.  The purpose of taking the
'epmutex' on add is to prevent complex topologies such as loops and deep
wakeup paths from forming in parallel through multiple EPOLL_CTL_ADD
operations.  However, for the simple case of an epoll file descriptor
attached directly to a wakeup source (with no nesting), we do not need to
hold the 'epmutex'.

This patch along with 'epoll: optimize EPOLL_CTL_DEL using rcu' improves
scalability on larger systems.  Quoting Nathan Zimmer's mail on SPECjbb
performance:

"On the 16 socket run the performance went from 35k jOPS to 125k jOPS.  In
addition the benchmark when from scaling well on 10 sockets to scaling
well on just over 40 sockets.

...

Currently the benchmark stops scaling at around 40-44 sockets but it seems like
I found a second unrelated bottleneck."

[akpm@linux-foundation.org: use `bool' for boolean variables, remove unneeded/undesirable cast of void*, add missed ep_scan_ready_list() kerneldoc]
Signed-off-by: default avatarJason Baron <jbaron@akamai.com>
Tested-by: default avatarNathan Zimmer <nzimmer@sgi.com>
Cc: Eric Wong <normalperson@yhbt.net>
Cc: Nelson Elhage <nelhage@nelhage.com>
Cc: Al Viro <viro@zeniv.linux.org.uk>
Cc: Davide Libenzi <davidel@xmailserver.org>
Cc: "Paul E. McKenney" <paulmck@us.ibm.com>
Signed-off-by: default avatarAndrew Morton <akpm@linux-foundation.org>
Signed-off-by: default avatarLinus Torvalds <torvalds@linux-foundation.org>
parent ae10b2b4
...@@ -585,14 +585,14 @@ static inline void ep_pm_stay_awake_rcu(struct epitem *epi) ...@@ -585,14 +585,14 @@ static inline void ep_pm_stay_awake_rcu(struct epitem *epi)
* @sproc: Pointer to the scan callback. * @sproc: Pointer to the scan callback.
* @priv: Private opaque data passed to the @sproc callback. * @priv: Private opaque data passed to the @sproc callback.
* @depth: The current depth of recursive f_op->poll calls. * @depth: The current depth of recursive f_op->poll calls.
* @ep_locked: caller already holds ep->mtx
* *
* Returns: The same integer error code returned by the @sproc callback. * Returns: The same integer error code returned by the @sproc callback.
*/ */
static int ep_scan_ready_list(struct eventpoll *ep, static int ep_scan_ready_list(struct eventpoll *ep,
int (*sproc)(struct eventpoll *, int (*sproc)(struct eventpoll *,
struct list_head *, void *), struct list_head *, void *),
void *priv, void *priv, int depth, bool ep_locked)
int depth)
{ {
int error, pwake = 0; int error, pwake = 0;
unsigned long flags; unsigned long flags;
...@@ -603,7 +603,9 @@ static int ep_scan_ready_list(struct eventpoll *ep, ...@@ -603,7 +603,9 @@ static int ep_scan_ready_list(struct eventpoll *ep,
* We need to lock this because we could be hit by * We need to lock this because we could be hit by
* eventpoll_release_file() and epoll_ctl(). * eventpoll_release_file() and epoll_ctl().
*/ */
mutex_lock_nested(&ep->mtx, depth);
if (!ep_locked)
mutex_lock_nested(&ep->mtx, depth);
/* /*
* Steal the ready list, and re-init the original one to the * Steal the ready list, and re-init the original one to the
...@@ -667,7 +669,8 @@ static int ep_scan_ready_list(struct eventpoll *ep, ...@@ -667,7 +669,8 @@ static int ep_scan_ready_list(struct eventpoll *ep,
} }
spin_unlock_irqrestore(&ep->lock, flags); spin_unlock_irqrestore(&ep->lock, flags);
mutex_unlock(&ep->mtx); if (!ep_locked)
mutex_unlock(&ep->mtx);
/* We have to call this outside the lock */ /* We have to call this outside the lock */
if (pwake) if (pwake)
...@@ -822,15 +825,34 @@ static int ep_read_events_proc(struct eventpoll *ep, struct list_head *head, ...@@ -822,15 +825,34 @@ static int ep_read_events_proc(struct eventpoll *ep, struct list_head *head,
return 0; return 0;
} }
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt);
struct readyevents_arg {
struct eventpoll *ep;
bool locked;
};
static int ep_poll_readyevents_proc(void *priv, void *cookie, int call_nests) static int ep_poll_readyevents_proc(void *priv, void *cookie, int call_nests)
{ {
return ep_scan_ready_list(priv, ep_read_events_proc, NULL, call_nests + 1); struct readyevents_arg *arg = priv;
return ep_scan_ready_list(arg->ep, ep_read_events_proc, NULL,
call_nests + 1, arg->locked);
} }
static unsigned int ep_eventpoll_poll(struct file *file, poll_table *wait) static unsigned int ep_eventpoll_poll(struct file *file, poll_table *wait)
{ {
int pollflags; int pollflags;
struct eventpoll *ep = file->private_data; struct eventpoll *ep = file->private_data;
struct readyevents_arg arg;
/*
* During ep_insert() we already hold the ep->mtx for the tfile.
* Prevent re-aquisition.
*/
arg.locked = wait && (wait->_qproc == ep_ptable_queue_proc);
arg.ep = ep;
/* Insert inside our poll wait queue */ /* Insert inside our poll wait queue */
poll_wait(file, &ep->poll_wait, wait); poll_wait(file, &ep->poll_wait, wait);
...@@ -842,7 +864,7 @@ static unsigned int ep_eventpoll_poll(struct file *file, poll_table *wait) ...@@ -842,7 +864,7 @@ static unsigned int ep_eventpoll_poll(struct file *file, poll_table *wait)
* could re-enter here. * could re-enter here.
*/ */
pollflags = ep_call_nested(&poll_readywalk_ncalls, EP_MAX_NESTS, pollflags = ep_call_nested(&poll_readywalk_ncalls, EP_MAX_NESTS,
ep_poll_readyevents_proc, ep, ep, current); ep_poll_readyevents_proc, &arg, ep, current);
return pollflags != -1 ? pollflags : 0; return pollflags != -1 ? pollflags : 0;
} }
...@@ -1243,7 +1265,7 @@ static noinline void ep_destroy_wakeup_source(struct epitem *epi) ...@@ -1243,7 +1265,7 @@ static noinline void ep_destroy_wakeup_source(struct epitem *epi)
* Must be called with "mtx" held. * Must be called with "mtx" held.
*/ */
static int ep_insert(struct eventpoll *ep, struct epoll_event *event, static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd) struct file *tfile, int fd, int full_check)
{ {
int error, revents, pwake = 0; int error, revents, pwake = 0;
unsigned long flags; unsigned long flags;
...@@ -1309,7 +1331,7 @@ static int ep_insert(struct eventpoll *ep, struct epoll_event *event, ...@@ -1309,7 +1331,7 @@ static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
/* now check if we've created too many backpaths */ /* now check if we've created too many backpaths */
error = -EINVAL; error = -EINVAL;
if (reverse_path_check()) if (full_check && reverse_path_check())
goto error_remove_epi; goto error_remove_epi;
/* We have to drop the new item inside our item list to keep track of it */ /* We have to drop the new item inside our item list to keep track of it */
...@@ -1532,7 +1554,7 @@ static int ep_send_events(struct eventpoll *ep, ...@@ -1532,7 +1554,7 @@ static int ep_send_events(struct eventpoll *ep,
esed.maxevents = maxevents; esed.maxevents = maxevents;
esed.events = events; esed.events = events;
return ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0); return ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
} }
static inline struct timespec ep_set_mstimeout(long ms) static inline struct timespec ep_set_mstimeout(long ms)
...@@ -1802,11 +1824,12 @@ SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd, ...@@ -1802,11 +1824,12 @@ SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event) struct epoll_event __user *, event)
{ {
int error; int error;
int did_lock_epmutex = 0; int full_check = 0;
struct fd f, tf; struct fd f, tf;
struct eventpoll *ep; struct eventpoll *ep;
struct epitem *epi; struct epitem *epi;
struct epoll_event epds; struct epoll_event epds;
struct eventpoll *tep = NULL;
error = -EFAULT; error = -EFAULT;
if (ep_op_has_event(op) && if (ep_op_has_event(op) &&
...@@ -1855,23 +1878,40 @@ SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd, ...@@ -1855,23 +1878,40 @@ SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
* and hang them on the tfile_check_list, so we can check that we * and hang them on the tfile_check_list, so we can check that we
* haven't created too many possible wakeup paths. * haven't created too many possible wakeup paths.
* *
* We need to hold the epmutex across ep_insert to prevent * We do not need to take the global 'epumutex' on EPOLL_CTL_ADD when
* multple adds from creating loops in parallel. * the epoll file descriptor is attaching directly to a wakeup source,
* unless the epoll file descriptor is nested. The purpose of taking the
* 'epmutex' on add is to prevent complex toplogies such as loops and
* deep wakeup paths from forming in parallel through multiple
* EPOLL_CTL_ADD operations.
*/ */
mutex_lock_nested(&ep->mtx, 0);
if (op == EPOLL_CTL_ADD) { if (op == EPOLL_CTL_ADD) {
mutex_lock(&epmutex); if (!list_empty(&f.file->f_ep_links) ||
did_lock_epmutex = 1; is_file_epoll(tf.file)) {
if (is_file_epoll(tf.file)) { full_check = 1;
error = -ELOOP; mutex_unlock(&ep->mtx);
if (ep_loop_check(ep, tf.file) != 0) { mutex_lock(&epmutex);
clear_tfile_check_list(); if (is_file_epoll(tf.file)) {
goto error_tgt_fput; error = -ELOOP;
if (ep_loop_check(ep, tf.file) != 0) {
clear_tfile_check_list();
goto error_tgt_fput;
}
} else
list_add(&tf.file->f_tfile_llink,
&tfile_check_list);
mutex_lock_nested(&ep->mtx, 0);
if (is_file_epoll(tf.file)) {
tep = tf.file->private_data;
mutex_lock_nested(&tep->mtx, 1);
} }
} else }
list_add(&tf.file->f_tfile_llink, &tfile_check_list); }
if (op == EPOLL_CTL_DEL && is_file_epoll(tf.file)) {
tep = tf.file->private_data;
mutex_lock_nested(&tep->mtx, 1);
} }
mutex_lock_nested(&ep->mtx, 0);
/* /*
* Try to lookup the file inside our RB tree, Since we grabbed "mtx" * Try to lookup the file inside our RB tree, Since we grabbed "mtx"
...@@ -1885,10 +1925,11 @@ SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd, ...@@ -1885,10 +1925,11 @@ SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
case EPOLL_CTL_ADD: case EPOLL_CTL_ADD:
if (!epi) { if (!epi) {
epds.events |= POLLERR | POLLHUP; epds.events |= POLLERR | POLLHUP;
error = ep_insert(ep, &epds, tf.file, fd); error = ep_insert(ep, &epds, tf.file, fd, full_check);
} else } else
error = -EEXIST; error = -EEXIST;
clear_tfile_check_list(); if (full_check)
clear_tfile_check_list();
break; break;
case EPOLL_CTL_DEL: case EPOLL_CTL_DEL:
if (epi) if (epi)
...@@ -1904,10 +1945,12 @@ SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd, ...@@ -1904,10 +1945,12 @@ SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
error = -ENOENT; error = -ENOENT;
break; break;
} }
if (tep != NULL)
mutex_unlock(&tep->mtx);
mutex_unlock(&ep->mtx); mutex_unlock(&ep->mtx);
error_tgt_fput: error_tgt_fput:
if (did_lock_epmutex) if (full_check)
mutex_unlock(&epmutex); mutex_unlock(&epmutex);
fdput(tf); fdput(tf);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment