Commit 0460fef2 authored by Kent Overstreet's avatar Kent Overstreet Committed by Linus Torvalds

aio: use cancellation list lazily

Cancelling kiocbs requires adding them to a per kioctx linked list,
which is one of the few things we need to take the kioctx lock for in
the fast path.  But most kiocbs can't be cancelled - so if we just do
this lazily, we can avoid quite a bit of locking overhead.

While we're at it, instead of using a flag bit switch to using ki_cancel
itself to indicate that a kiocb has been cancelled/completed.  This lets
us get rid of ki_flags entirely.

[akpm@linux-foundation.org: remove buggy BUG()]
Signed-off-by: default avatarKent Overstreet <koverstreet@google.com>
Cc: Zach Brown <zab@redhat.com>
Cc: Felipe Balbi <balbi@ti.com>
Cc: Greg Kroah-Hartman <gregkh@linuxfoundation.org>
Cc: Mark Fasheh <mfasheh@suse.com>
Cc: Joel Becker <jlbec@evilplan.org>
Cc: Rusty Russell <rusty@rustcorp.com.au>
Cc: Jens Axboe <axboe@kernel.dk>
Cc: Asai Thambi S P <asamymuthupa@micron.com>
Cc: Selvan Mani <smani@micron.com>
Cc: Sam Bradshaw <sbradshaw@micron.com>
Cc: Jeff Moyer <jmoyer@redhat.com>
Cc: Al Viro <viro@zeniv.linux.org.uk>
Cc: Benjamin LaHaise <bcrl@kvack.org>
Reviewed-by: default avatar"Theodore Ts'o" <tytso@mit.edu>
Signed-off-by: default avatarAndrew Morton <akpm@linux-foundation.org>
Signed-off-by: default avatarLinus Torvalds <torvalds@linux-foundation.org>
parent 21b40200
...@@ -533,7 +533,6 @@ static int ep_aio_cancel(struct kiocb *iocb, struct io_event *e) ...@@ -533,7 +533,6 @@ static int ep_aio_cancel(struct kiocb *iocb, struct io_event *e)
local_irq_disable(); local_irq_disable();
epdata = priv->epdata; epdata = priv->epdata;
// spin_lock(&epdata->dev->lock); // spin_lock(&epdata->dev->lock);
kiocbSetCancelled(iocb);
if (likely(epdata && epdata->ep && priv->req)) if (likely(epdata && epdata->ep && priv->req))
value = usb_ep_dequeue (epdata->ep, priv->req); value = usb_ep_dequeue (epdata->ep, priv->req);
else else
...@@ -663,7 +662,7 @@ ep_aio_rwtail( ...@@ -663,7 +662,7 @@ ep_aio_rwtail(
goto fail; goto fail;
} }
iocb->ki_cancel = ep_aio_cancel; kiocb_set_cancel_fn(iocb, ep_aio_cancel);
get_ep(epdata); get_ep(epdata);
priv->epdata = epdata; priv->epdata = epdata;
priv->actual = 0; priv->actual = 0;
......
...@@ -97,6 +97,8 @@ struct kioctx { ...@@ -97,6 +97,8 @@ struct kioctx {
struct aio_ring_info ring_info; struct aio_ring_info ring_info;
spinlock_t completion_lock;
struct rcu_head rcu_head; struct rcu_head rcu_head;
struct work_struct rcu_work; struct work_struct rcu_work;
}; };
...@@ -220,25 +222,51 @@ static int aio_setup_ring(struct kioctx *ctx) ...@@ -220,25 +222,51 @@ static int aio_setup_ring(struct kioctx *ctx)
#define AIO_EVENTS_FIRST_PAGE ((PAGE_SIZE - sizeof(struct aio_ring)) / sizeof(struct io_event)) #define AIO_EVENTS_FIRST_PAGE ((PAGE_SIZE - sizeof(struct aio_ring)) / sizeof(struct io_event))
#define AIO_EVENTS_OFFSET (AIO_EVENTS_PER_PAGE - AIO_EVENTS_FIRST_PAGE) #define AIO_EVENTS_OFFSET (AIO_EVENTS_PER_PAGE - AIO_EVENTS_FIRST_PAGE)
void kiocb_set_cancel_fn(struct kiocb *req, kiocb_cancel_fn *cancel)
{
struct kioctx *ctx = req->ki_ctx;
unsigned long flags;
spin_lock_irqsave(&ctx->ctx_lock, flags);
if (!req->ki_list.next)
list_add(&req->ki_list, &ctx->active_reqs);
req->ki_cancel = cancel;
spin_unlock_irqrestore(&ctx->ctx_lock, flags);
}
EXPORT_SYMBOL(kiocb_set_cancel_fn);
static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb, static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
struct io_event *res) struct io_event *res)
{ {
int (*cancel)(struct kiocb *, struct io_event *); kiocb_cancel_fn *old, *cancel;
int ret = -EINVAL; int ret = -EINVAL;
cancel = kiocb->ki_cancel; /*
kiocbSetCancelled(kiocb); * Don't want to set kiocb->ki_cancel = KIOCB_CANCELLED unless it
if (cancel) { * actually has a cancel function, hence the cmpxchg()
atomic_inc(&kiocb->ki_users); */
spin_unlock_irq(&ctx->ctx_lock);
cancel = ACCESS_ONCE(kiocb->ki_cancel);
do {
if (!cancel || cancel == KIOCB_CANCELLED)
return ret;
memset(res, 0, sizeof(*res)); old = cancel;
res->obj = (u64)(unsigned long)kiocb->ki_obj.user; cancel = cmpxchg(&kiocb->ki_cancel, old, KIOCB_CANCELLED);
res->data = kiocb->ki_user_data; } while (cancel != old);
ret = cancel(kiocb, res);
spin_lock_irq(&ctx->ctx_lock); atomic_inc(&kiocb->ki_users);
} spin_unlock_irq(&ctx->ctx_lock);
memset(res, 0, sizeof(*res));
res->obj = (u64)(unsigned long)kiocb->ki_obj.user;
res->data = kiocb->ki_user_data;
ret = cancel(kiocb, res);
spin_lock_irq(&ctx->ctx_lock);
return ret; return ret;
} }
...@@ -326,6 +354,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events) ...@@ -326,6 +354,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
atomic_set(&ctx->users, 2); atomic_set(&ctx->users, 2);
atomic_set(&ctx->dead, 0); atomic_set(&ctx->dead, 0);
spin_lock_init(&ctx->ctx_lock); spin_lock_init(&ctx->ctx_lock);
spin_lock_init(&ctx->completion_lock);
mutex_init(&ctx->ring_info.ring_lock); mutex_init(&ctx->ring_info.ring_lock);
init_waitqueue_head(&ctx->wait); init_waitqueue_head(&ctx->wait);
...@@ -468,20 +497,12 @@ static struct kiocb *__aio_get_req(struct kioctx *ctx) ...@@ -468,20 +497,12 @@ static struct kiocb *__aio_get_req(struct kioctx *ctx)
{ {
struct kiocb *req = NULL; struct kiocb *req = NULL;
req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL); req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO);
if (unlikely(!req)) if (unlikely(!req))
return NULL; return NULL;
req->ki_flags = 0;
atomic_set(&req->ki_users, 2); atomic_set(&req->ki_users, 2);
req->ki_key = 0;
req->ki_ctx = ctx; req->ki_ctx = ctx;
req->ki_cancel = NULL;
req->ki_retry = NULL;
req->ki_dtor = NULL;
req->private = NULL;
req->ki_iovec = NULL;
req->ki_eventfd = NULL;
return req; return req;
} }
...@@ -512,7 +533,6 @@ static void kiocb_batch_free(struct kioctx *ctx, struct kiocb_batch *batch) ...@@ -512,7 +533,6 @@ static void kiocb_batch_free(struct kioctx *ctx, struct kiocb_batch *batch)
spin_lock_irq(&ctx->ctx_lock); spin_lock_irq(&ctx->ctx_lock);
list_for_each_entry_safe(req, n, &batch->head, ki_batch) { list_for_each_entry_safe(req, n, &batch->head, ki_batch) {
list_del(&req->ki_batch); list_del(&req->ki_batch);
list_del(&req->ki_list);
kmem_cache_free(kiocb_cachep, req); kmem_cache_free(kiocb_cachep, req);
atomic_dec(&ctx->reqs_active); atomic_dec(&ctx->reqs_active);
} }
...@@ -559,10 +579,7 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch) ...@@ -559,10 +579,7 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
} }
batch->count -= allocated; batch->count -= allocated;
list_for_each_entry(req, &batch->head, ki_batch) { atomic_add(allocated, &ctx->reqs_active);
list_add(&req->ki_list, &ctx->active_reqs);
atomic_inc(&ctx->reqs_active);
}
kunmap_atomic(ring); kunmap_atomic(ring);
spin_unlock_irq(&ctx->ctx_lock); spin_unlock_irq(&ctx->ctx_lock);
...@@ -653,25 +670,34 @@ void aio_complete(struct kiocb *iocb, long res, long res2) ...@@ -653,25 +670,34 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
info = &ctx->ring_info; info = &ctx->ring_info;
/* /*
* Add a completion event to the ring buffer. Must be done holding
* ctx->ctx_lock to prevent other code from messing with the tail
* pointer since we might be called from irq context.
*
* Take rcu_read_lock() in case the kioctx is being destroyed, as we * Take rcu_read_lock() in case the kioctx is being destroyed, as we
* need to issue a wakeup after decrementing reqs_active. * need to issue a wakeup after decrementing reqs_active.
*/ */
rcu_read_lock(); rcu_read_lock();
spin_lock_irqsave(&ctx->ctx_lock, flags);
list_del(&iocb->ki_list); /* remove from active_reqs */ if (iocb->ki_list.next) {
unsigned long flags;
spin_lock_irqsave(&ctx->ctx_lock, flags);
list_del(&iocb->ki_list);
spin_unlock_irqrestore(&ctx->ctx_lock, flags);
}
/* /*
* cancelled requests don't get events, userland was given one * cancelled requests don't get events, userland was given one
* when the event got cancelled. * when the event got cancelled.
*/ */
if (kiocbIsCancelled(iocb)) if (unlikely(xchg(&iocb->ki_cancel,
KIOCB_CANCELLED) == KIOCB_CANCELLED))
goto put_rq; goto put_rq;
/*
* Add a completion event to the ring buffer. Must be done holding
* ctx->ctx_lock to prevent other code from messing with the tail
* pointer since we might be called from irq context.
*/
spin_lock_irqsave(&ctx->completion_lock, flags);
tail = info->tail; tail = info->tail;
pos = tail + AIO_EVENTS_OFFSET; pos = tail + AIO_EVENTS_OFFSET;
...@@ -705,6 +731,8 @@ void aio_complete(struct kiocb *iocb, long res, long res2) ...@@ -705,6 +731,8 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
kunmap_atomic(ring); kunmap_atomic(ring);
flush_dcache_page(info->ring_pages[0]); flush_dcache_page(info->ring_pages[0]);
spin_unlock_irqrestore(&ctx->completion_lock, flags);
pr_debug("added to ring %p at [%u]\n", iocb, tail); pr_debug("added to ring %p at [%u]\n", iocb, tail);
/* /*
...@@ -731,7 +759,6 @@ void aio_complete(struct kiocb *iocb, long res, long res2) ...@@ -731,7 +759,6 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
if (waitqueue_active(&ctx->wait)) if (waitqueue_active(&ctx->wait))
wake_up(&ctx->wait); wake_up(&ctx->wait);
spin_unlock_irqrestore(&ctx->ctx_lock, flags);
rcu_read_unlock(); rcu_read_unlock();
} }
EXPORT_SYMBOL(aio_complete); EXPORT_SYMBOL(aio_complete);
...@@ -1216,15 +1243,10 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, ...@@ -1216,15 +1243,10 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
req->ki_opcode = iocb->aio_lio_opcode; req->ki_opcode = iocb->aio_lio_opcode;
ret = aio_setup_iocb(req, compat); ret = aio_setup_iocb(req, compat);
if (ret) if (ret)
goto out_put_req; goto out_put_req;
if (unlikely(kiocbIsCancelled(req))) ret = req->ki_retry(req);
ret = -EINTR;
else
ret = req->ki_retry(req);
if (ret != -EIOCBQUEUED) { if (ret != -EIOCBQUEUED) {
/* /*
* There's no easy way to restart the syscall since other AIO's * There's no easy way to restart the syscall since other AIO's
...@@ -1241,10 +1263,6 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, ...@@ -1241,10 +1263,6 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
return 0; return 0;
out_put_req: out_put_req:
spin_lock_irq(&ctx->ctx_lock);
list_del(&req->ki_list);
spin_unlock_irq(&ctx->ctx_lock);
atomic_dec(&ctx->reqs_active); atomic_dec(&ctx->reqs_active);
aio_put_req(req); /* drop extra ref to req */ aio_put_req(req); /* drop extra ref to req */
aio_put_req(req); /* drop i/o ref to req */ aio_put_req(req); /* drop i/o ref to req */
......
...@@ -10,17 +10,24 @@ ...@@ -10,17 +10,24 @@
#include <linux/atomic.h> #include <linux/atomic.h>
struct kioctx; struct kioctx;
struct kiocb;
#define KIOCB_SYNC_KEY (~0U) #define KIOCB_SYNC_KEY (~0U)
/* ki_flags bits */ /*
#define KIF_CANCELLED 2 * We use ki_cancel == KIOCB_CANCELLED to indicate that a kiocb has been either
* cancelled or completed (this makes a certain amount of sense because
#define kiocbSetCancelled(iocb) set_bit(KIF_CANCELLED, &(iocb)->ki_flags) * successful cancellation - io_cancel() - does deliver the completion to
* userspace).
#define kiocbClearCancelled(iocb) clear_bit(KIF_CANCELLED, &(iocb)->ki_flags) *
* And since most things don't implement kiocb cancellation and we'd really like
* kiocb completion to be lockless when possible, we use ki_cancel to
* synchronize cancellation and completion - we only set it to KIOCB_CANCELLED
* with xchg() or cmpxchg(), see batch_complete_aio() and kiocb_cancel().
*/
#define KIOCB_CANCELLED ((void *) (~0ULL))
#define kiocbIsCancelled(iocb) test_bit(KIF_CANCELLED, &(iocb)->ki_flags) typedef int (kiocb_cancel_fn)(struct kiocb *, struct io_event *);
/* is there a better place to document function pointer methods? */ /* is there a better place to document function pointer methods? */
/** /**
...@@ -48,13 +55,12 @@ struct kioctx; ...@@ -48,13 +55,12 @@ struct kioctx;
* calls may result in undefined behaviour. * calls may result in undefined behaviour.
*/ */
struct kiocb { struct kiocb {
unsigned long ki_flags;
atomic_t ki_users; atomic_t ki_users;
unsigned ki_key; /* id of this request */ unsigned ki_key; /* id of this request */
struct file *ki_filp; struct file *ki_filp;
struct kioctx *ki_ctx; /* may be NULL for sync ops */ struct kioctx *ki_ctx; /* may be NULL for sync ops */
int (*ki_cancel)(struct kiocb *, struct io_event *); kiocb_cancel_fn *ki_cancel;
ssize_t (*ki_retry)(struct kiocb *); ssize_t (*ki_retry)(struct kiocb *);
void (*ki_dtor)(struct kiocb *); void (*ki_dtor)(struct kiocb *);
...@@ -112,6 +118,7 @@ struct mm_struct; ...@@ -112,6 +118,7 @@ struct mm_struct;
extern void exit_aio(struct mm_struct *mm); extern void exit_aio(struct mm_struct *mm);
extern long do_io_submit(aio_context_t ctx_id, long nr, extern long do_io_submit(aio_context_t ctx_id, long nr,
struct iocb __user *__user *iocbpp, bool compat); struct iocb __user *__user *iocbpp, bool compat);
void kiocb_set_cancel_fn(struct kiocb *req, kiocb_cancel_fn *cancel);
#else #else
static inline ssize_t wait_on_sync_kiocb(struct kiocb *iocb) { return 0; } static inline ssize_t wait_on_sync_kiocb(struct kiocb *iocb) { return 0; }
static inline void aio_put_req(struct kiocb *iocb) { } static inline void aio_put_req(struct kiocb *iocb) { }
...@@ -121,6 +128,8 @@ static inline void exit_aio(struct mm_struct *mm) { } ...@@ -121,6 +128,8 @@ static inline void exit_aio(struct mm_struct *mm) { }
static inline long do_io_submit(aio_context_t ctx_id, long nr, static inline long do_io_submit(aio_context_t ctx_id, long nr,
struct iocb __user * __user *iocbpp, struct iocb __user * __user *iocbpp,
bool compat) { return 0; } bool compat) { return 0; }
static inline void kiocb_set_cancel_fn(struct kiocb *req,
kiocb_cancel_fn *cancel) { }
#endif /* CONFIG_AIO */ #endif /* CONFIG_AIO */
static inline struct kiocb *list_kiocb(struct list_head *h) static inline struct kiocb *list_kiocb(struct list_head *h)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment