Commit 4f26bda1 authored by Pavel Begunkov's avatar Pavel Begunkov Committed by Jens Axboe

io-wq: add an option to cancel all matched reqs

This adds support for cancelling all io-wq works matching a predicate.
It isn't used yet, so no change in observable behaviour.
Signed-off-by: default avatarPavel Begunkov <asml.silence@gmail.com>
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent f4c2665e
...@@ -903,13 +903,15 @@ void io_wq_cancel_all(struct io_wq *wq) ...@@ -903,13 +903,15 @@ void io_wq_cancel_all(struct io_wq *wq)
struct io_cb_cancel_data { struct io_cb_cancel_data {
work_cancel_fn *fn; work_cancel_fn *fn;
void *data; void *data;
int nr_running;
int nr_pending;
bool cancel_all;
}; };
static bool io_wq_worker_cancel(struct io_worker *worker, void *data) static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
{ {
struct io_cb_cancel_data *match = data; struct io_cb_cancel_data *match = data;
unsigned long flags; unsigned long flags;
bool ret = false;
/* /*
* Hold the lock to avoid ->cur_work going out of scope, caller * Hold the lock to avoid ->cur_work going out of scope, caller
...@@ -920,55 +922,55 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data) ...@@ -920,55 +922,55 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
!(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL) && !(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL) &&
match->fn(worker->cur_work, match->data)) { match->fn(worker->cur_work, match->data)) {
send_sig(SIGINT, worker->task, 1); send_sig(SIGINT, worker->task, 1);
ret = true; match->nr_running++;
} }
spin_unlock_irqrestore(&worker->lock, flags); spin_unlock_irqrestore(&worker->lock, flags);
return ret; return match->nr_running && !match->cancel_all;
} }
static bool io_wqe_cancel_pending_work(struct io_wqe *wqe, static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
struct io_cb_cancel_data *match) struct io_cb_cancel_data *match)
{ {
struct io_wq_work_node *node, *prev; struct io_wq_work_node *node, *prev;
struct io_wq_work *work; struct io_wq_work *work;
unsigned long flags; unsigned long flags;
bool found = false;
retry:
spin_lock_irqsave(&wqe->lock, flags); spin_lock_irqsave(&wqe->lock, flags);
wq_list_for_each(node, prev, &wqe->work_list) { wq_list_for_each(node, prev, &wqe->work_list) {
work = container_of(node, struct io_wq_work, list); work = container_of(node, struct io_wq_work, list);
if (!match->fn(work, match->data))
continue;
if (match->fn(work, match->data)) { wq_list_del(&wqe->work_list, node, prev);
wq_list_del(&wqe->work_list, node, prev); spin_unlock_irqrestore(&wqe->lock, flags);
found = true; io_run_cancel(work, wqe);
break; match->nr_pending++;
} if (!match->cancel_all)
return;
/* not safe to continue after unlock */
goto retry;
} }
spin_unlock_irqrestore(&wqe->lock, flags); spin_unlock_irqrestore(&wqe->lock, flags);
if (found)
io_run_cancel(work, wqe);
return found;
} }
static bool io_wqe_cancel_running_work(struct io_wqe *wqe, static void io_wqe_cancel_running_work(struct io_wqe *wqe,
struct io_cb_cancel_data *match) struct io_cb_cancel_data *match)
{ {
bool found;
rcu_read_lock(); rcu_read_lock();
found = io_wq_for_each_worker(wqe, io_wq_worker_cancel, match); io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
rcu_read_unlock(); rcu_read_unlock();
return found;
} }
enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
void *data) void *data, bool cancel_all)
{ {
struct io_cb_cancel_data match = { struct io_cb_cancel_data match = {
.fn = cancel, .fn = cancel,
.data = data, .data = data,
.cancel_all = cancel_all,
}; };
int node; int node;
...@@ -980,7 +982,8 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, ...@@ -980,7 +982,8 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
for_each_node(node) { for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node]; struct io_wqe *wqe = wq->wqes[node];
if (io_wqe_cancel_pending_work(wqe, &match)) io_wqe_cancel_pending_work(wqe, &match);
if (match.nr_pending && !match.cancel_all)
return IO_WQ_CANCEL_OK; return IO_WQ_CANCEL_OK;
} }
...@@ -993,10 +996,15 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, ...@@ -993,10 +996,15 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
for_each_node(node) { for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node]; struct io_wqe *wqe = wq->wqes[node];
if (io_wqe_cancel_running_work(wqe, &match)) io_wqe_cancel_running_work(wqe, &match);
if (match.nr_running && !match.cancel_all)
return IO_WQ_CANCEL_RUNNING; return IO_WQ_CANCEL_RUNNING;
} }
if (match.nr_running)
return IO_WQ_CANCEL_RUNNING;
if (match.nr_pending)
return IO_WQ_CANCEL_OK;
return IO_WQ_CANCEL_NOTFOUND; return IO_WQ_CANCEL_NOTFOUND;
} }
...@@ -1007,7 +1015,7 @@ static bool io_wq_io_cb_cancel_data(struct io_wq_work *work, void *data) ...@@ -1007,7 +1015,7 @@ static bool io_wq_io_cb_cancel_data(struct io_wq_work *work, void *data)
enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork) enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork)
{ {
return io_wq_cancel_cb(wq, io_wq_io_cb_cancel_data, (void *)cwork); return io_wq_cancel_cb(wq, io_wq_io_cb_cancel_data, (void *)cwork, false);
} }
static bool io_wq_pid_match(struct io_wq_work *work, void *data) static bool io_wq_pid_match(struct io_wq_work *work, void *data)
...@@ -1021,7 +1029,7 @@ enum io_wq_cancel io_wq_cancel_pid(struct io_wq *wq, pid_t pid) ...@@ -1021,7 +1029,7 @@ enum io_wq_cancel io_wq_cancel_pid(struct io_wq *wq, pid_t pid)
{ {
void *data = (void *) (unsigned long) pid; void *data = (void *) (unsigned long) pid;
return io_wq_cancel_cb(wq, io_wq_pid_match, data); return io_wq_cancel_cb(wq, io_wq_pid_match, data, false);
} }
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
......
...@@ -130,7 +130,7 @@ enum io_wq_cancel io_wq_cancel_pid(struct io_wq *wq, pid_t pid); ...@@ -130,7 +130,7 @@ enum io_wq_cancel io_wq_cancel_pid(struct io_wq *wq, pid_t pid);
typedef bool (work_cancel_fn)(struct io_wq_work *, void *); typedef bool (work_cancel_fn)(struct io_wq_work *, void *);
enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
void *data); void *data, bool cancel_all);
struct task_struct *io_wq_get_task(struct io_wq *wq); struct task_struct *io_wq_get_task(struct io_wq *wq);
......
...@@ -4773,7 +4773,7 @@ static int io_async_cancel_one(struct io_ring_ctx *ctx, void *sqe_addr) ...@@ -4773,7 +4773,7 @@ static int io_async_cancel_one(struct io_ring_ctx *ctx, void *sqe_addr)
enum io_wq_cancel cancel_ret; enum io_wq_cancel cancel_ret;
int ret = 0; int ret = 0;
cancel_ret = io_wq_cancel_cb(ctx->io_wq, io_cancel_cb, sqe_addr); cancel_ret = io_wq_cancel_cb(ctx->io_wq, io_cancel_cb, sqe_addr, false);
switch (cancel_ret) { switch (cancel_ret) {
case IO_WQ_CANCEL_OK: case IO_WQ_CANCEL_OK:
ret = 0; ret = 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment