Commit e9e006f5 authored by Mike Christie's avatar Mike Christie Committed by Jens Axboe

nbd: fix max number of supported devs

This fixes a bug added in 4.10 with commit:

commit 9561a7ad
Author: Josef Bacik <jbacik@fb.com>
Date:   Tue Nov 22 14:04:40 2016 -0500

    nbd: add multi-connection support

that limited the number of devices to 256. Before the patch we could
create 1000s of devices, but the patch switched us from using our
own thread to using a work queue which has a default limit of 256
active works.

The problem is that our recv_work function sits in a loop until
disconnection but only handles IO for one connection. The work is
started when the connection is started/restarted, but if we end up
creating 257 or more connections, the queue_work call just queues
connection257+'s recv_work and that waits for connection 1 - 256's
recv_work to be disconnected and that work instance completing.

Instead of reverting back to kthreads, this has us allocate a
workqueue_struct per device, so we can block in the work.

Cc: stable@vger.kernel.org
Reviewed-by: default avatarJosef Bacik <josef@toxicpanda.com>
Signed-off-by: default avatarMike Christie <mchristi@redhat.com>
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 2da22da5
...@@ -108,6 +108,7 @@ struct nbd_device { ...@@ -108,6 +108,7 @@ struct nbd_device {
struct nbd_config *config; struct nbd_config *config;
struct mutex config_lock; struct mutex config_lock;
struct gendisk *disk; struct gendisk *disk;
struct workqueue_struct *recv_workq;
struct list_head list; struct list_head list;
struct task_struct *task_recv; struct task_struct *task_recv;
...@@ -139,7 +140,6 @@ static struct dentry *nbd_dbg_dir; ...@@ -139,7 +140,6 @@ static struct dentry *nbd_dbg_dir;
static unsigned int nbds_max = 16; static unsigned int nbds_max = 16;
static int max_part = 16; static int max_part = 16;
static struct workqueue_struct *recv_workqueue;
static int part_shift; static int part_shift;
static int nbd_dev_dbg_init(struct nbd_device *nbd); static int nbd_dev_dbg_init(struct nbd_device *nbd);
...@@ -1058,7 +1058,7 @@ static int nbd_reconnect_socket(struct nbd_device *nbd, unsigned long arg) ...@@ -1058,7 +1058,7 @@ static int nbd_reconnect_socket(struct nbd_device *nbd, unsigned long arg)
/* We take the tx_mutex in an error path in the recv_work, so we /* We take the tx_mutex in an error path in the recv_work, so we
* need to queue_work outside of the tx_mutex. * need to queue_work outside of the tx_mutex.
*/ */
queue_work(recv_workqueue, &args->work); queue_work(nbd->recv_workq, &args->work);
atomic_inc(&config->live_connections); atomic_inc(&config->live_connections);
wake_up(&config->conn_wait); wake_up(&config->conn_wait);
...@@ -1159,6 +1159,10 @@ static void nbd_config_put(struct nbd_device *nbd) ...@@ -1159,6 +1159,10 @@ static void nbd_config_put(struct nbd_device *nbd)
kfree(nbd->config); kfree(nbd->config);
nbd->config = NULL; nbd->config = NULL;
if (nbd->recv_workq)
destroy_workqueue(nbd->recv_workq);
nbd->recv_workq = NULL;
nbd->tag_set.timeout = 0; nbd->tag_set.timeout = 0;
nbd->disk->queue->limits.discard_granularity = 0; nbd->disk->queue->limits.discard_granularity = 0;
nbd->disk->queue->limits.discard_alignment = 0; nbd->disk->queue->limits.discard_alignment = 0;
...@@ -1187,6 +1191,14 @@ static int nbd_start_device(struct nbd_device *nbd) ...@@ -1187,6 +1191,14 @@ static int nbd_start_device(struct nbd_device *nbd)
return -EINVAL; return -EINVAL;
} }
nbd->recv_workq = alloc_workqueue("knbd%d-recv",
WQ_MEM_RECLAIM | WQ_HIGHPRI |
WQ_UNBOUND, 0, nbd->index);
if (!nbd->recv_workq) {
dev_err(disk_to_dev(nbd->disk), "Could not allocate knbd recv work queue.\n");
return -ENOMEM;
}
blk_mq_update_nr_hw_queues(&nbd->tag_set, config->num_connections); blk_mq_update_nr_hw_queues(&nbd->tag_set, config->num_connections);
nbd->task_recv = current; nbd->task_recv = current;
...@@ -1217,7 +1229,7 @@ static int nbd_start_device(struct nbd_device *nbd) ...@@ -1217,7 +1229,7 @@ static int nbd_start_device(struct nbd_device *nbd)
INIT_WORK(&args->work, recv_work); INIT_WORK(&args->work, recv_work);
args->nbd = nbd; args->nbd = nbd;
args->index = i; args->index = i;
queue_work(recv_workqueue, &args->work); queue_work(nbd->recv_workq, &args->work);
} }
nbd_size_update(nbd); nbd_size_update(nbd);
return error; return error;
...@@ -1237,8 +1249,10 @@ static int nbd_start_device_ioctl(struct nbd_device *nbd, struct block_device *b ...@@ -1237,8 +1249,10 @@ static int nbd_start_device_ioctl(struct nbd_device *nbd, struct block_device *b
mutex_unlock(&nbd->config_lock); mutex_unlock(&nbd->config_lock);
ret = wait_event_interruptible(config->recv_wq, ret = wait_event_interruptible(config->recv_wq,
atomic_read(&config->recv_threads) == 0); atomic_read(&config->recv_threads) == 0);
if (ret) if (ret) {
sock_shutdown(nbd); sock_shutdown(nbd);
flush_workqueue(nbd->recv_workq);
}
mutex_lock(&nbd->config_lock); mutex_lock(&nbd->config_lock);
nbd_bdev_reset(bdev); nbd_bdev_reset(bdev);
/* user requested, ignore socket errors */ /* user requested, ignore socket errors */
...@@ -1899,6 +1913,12 @@ static void nbd_disconnect_and_put(struct nbd_device *nbd) ...@@ -1899,6 +1913,12 @@ static void nbd_disconnect_and_put(struct nbd_device *nbd)
nbd_disconnect(nbd); nbd_disconnect(nbd);
nbd_clear_sock(nbd); nbd_clear_sock(nbd);
mutex_unlock(&nbd->config_lock); mutex_unlock(&nbd->config_lock);
/*
* Make sure recv thread has finished, so it does not drop the last
* config ref and try to destroy the workqueue from inside the work
* queue.
*/
flush_workqueue(nbd->recv_workq);
if (test_and_clear_bit(NBD_HAS_CONFIG_REF, if (test_and_clear_bit(NBD_HAS_CONFIG_REF,
&nbd->config->runtime_flags)) &nbd->config->runtime_flags))
nbd_config_put(nbd); nbd_config_put(nbd);
...@@ -2283,20 +2303,12 @@ static int __init nbd_init(void) ...@@ -2283,20 +2303,12 @@ static int __init nbd_init(void)
if (nbds_max > 1UL << (MINORBITS - part_shift)) if (nbds_max > 1UL << (MINORBITS - part_shift))
return -EINVAL; return -EINVAL;
recv_workqueue = alloc_workqueue("knbd-recv",
WQ_MEM_RECLAIM | WQ_HIGHPRI |
WQ_UNBOUND, 0);
if (!recv_workqueue)
return -ENOMEM;
if (register_blkdev(NBD_MAJOR, "nbd")) { if (register_blkdev(NBD_MAJOR, "nbd"))
destroy_workqueue(recv_workqueue);
return -EIO; return -EIO;
}
if (genl_register_family(&nbd_genl_family)) { if (genl_register_family(&nbd_genl_family)) {
unregister_blkdev(NBD_MAJOR, "nbd"); unregister_blkdev(NBD_MAJOR, "nbd");
destroy_workqueue(recv_workqueue);
return -EINVAL; return -EINVAL;
} }
nbd_dbg_init(); nbd_dbg_init();
...@@ -2338,7 +2350,6 @@ static void __exit nbd_cleanup(void) ...@@ -2338,7 +2350,6 @@ static void __exit nbd_cleanup(void)
idr_destroy(&nbd_index_idr); idr_destroy(&nbd_index_idr);
genl_unregister_family(&nbd_genl_family); genl_unregister_family(&nbd_genl_family);
destroy_workqueue(recv_workqueue);
unregister_blkdev(NBD_MAJOR, "nbd"); unregister_blkdev(NBD_MAJOR, "nbd");
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment