Commit 3e0249f9 authored by Zach Brown's avatar Zach Brown Committed by Andy Grover

RDS/IB: add refcount tracking to struct rds_ib_device

The RDS IB client .remove callback used to free the rds_ibdev for the given
device unconditionally.  This could race other users of the struct.  This patch
adds refcounting so that we only free the rds_ibdev once all of its users are
done.

Many rds_ibdev users are tied to connections.  We give the connection a
reference and change these users to reference the device in the connection
instead of looking it up in the IB client data.  The only user of the IB client
data remaining is the first lookup of the device as connections are built up.

Incrementing the reference count of a device found in the IB client data could
race with final freeing so we use an RCU grace period to make sure that freeing
won't happen until those lookups are done.

MRs need the rds_ibdev to get at the pool that they're freed in to.  They exist
outside a connection and many MRs can reference different devices from one
socket, so it was natural to have each MR hold a reference.  MR refs can be
dropped from interrupt handlers and final device teardown can block so we push
it off to a work struct.  Pool teardown had to be fixed to cancel its pending
work instead of deadlocking waiting for all queued work, including itself, to
finish.

MRs get their reference from the global device list, which gets a reference.
It is left unprotected by locks and remains racy.  A simple global lock would
be a significant bottleneck.  More scalable (complicated) locking should be
done carefully in a later patch.
Signed-off-by: default avatarZach Brown <zach.brown@oracle.com>
parent 89bf9d41
...@@ -59,6 +59,38 @@ struct list_head rds_ib_devices; ...@@ -59,6 +59,38 @@ struct list_head rds_ib_devices;
DEFINE_SPINLOCK(ib_nodev_conns_lock); DEFINE_SPINLOCK(ib_nodev_conns_lock);
LIST_HEAD(ib_nodev_conns); LIST_HEAD(ib_nodev_conns);
/*
* rds_ib_destroy_mr_pool() blocks on a few things and mrs drop references
* from interrupt context so we push freing off into a work struct in krdsd.
*/
static void rds_ib_dev_free(struct work_struct *work)
{
struct rds_ib_ipaddr *i_ipaddr, *i_next;
struct rds_ib_device *rds_ibdev = container_of(work,
struct rds_ib_device, free_work);
if (rds_ibdev->mr_pool)
rds_ib_destroy_mr_pool(rds_ibdev->mr_pool);
if (rds_ibdev->mr)
ib_dereg_mr(rds_ibdev->mr);
if (rds_ibdev->pd)
ib_dealloc_pd(rds_ibdev->pd);
list_for_each_entry_safe(i_ipaddr, i_next, &rds_ibdev->ipaddr_list, list) {
list_del(&i_ipaddr->list);
kfree(i_ipaddr);
}
kfree(rds_ibdev);
}
void rds_ib_dev_put(struct rds_ib_device *rds_ibdev)
{
BUG_ON(atomic_read(&rds_ibdev->refcount) <= 0);
if (atomic_dec_and_test(&rds_ibdev->refcount))
queue_work(rds_wq, &rds_ibdev->free_work);
}
void rds_ib_add_one(struct ib_device *device) void rds_ib_add_one(struct ib_device *device)
{ {
struct rds_ib_device *rds_ibdev; struct rds_ib_device *rds_ibdev;
...@@ -77,11 +109,14 @@ void rds_ib_add_one(struct ib_device *device) ...@@ -77,11 +109,14 @@ void rds_ib_add_one(struct ib_device *device)
goto free_attr; goto free_attr;
} }
rds_ibdev = kmalloc_node(sizeof *rds_ibdev, GFP_KERNEL, ibdev_to_node(device)); rds_ibdev = kzalloc_node(sizeof(struct rds_ib_device), GFP_KERNEL,
ibdev_to_node(device));
if (!rds_ibdev) if (!rds_ibdev)
goto free_attr; goto free_attr;
spin_lock_init(&rds_ibdev->spinlock); spin_lock_init(&rds_ibdev->spinlock);
atomic_set(&rds_ibdev->refcount, 1);
INIT_WORK(&rds_ibdev->free_work, rds_ib_dev_free);
rds_ibdev->max_wrs = dev_attr->max_qp_wr; rds_ibdev->max_wrs = dev_attr->max_qp_wr;
rds_ibdev->max_sge = min(dev_attr->max_sge, RDS_IB_MAX_SGE); rds_ibdev->max_sge = min(dev_attr->max_sge, RDS_IB_MAX_SGE);
...@@ -96,67 +131,93 @@ void rds_ib_add_one(struct ib_device *device) ...@@ -96,67 +131,93 @@ void rds_ib_add_one(struct ib_device *device)
rds_ibdev->dev = device; rds_ibdev->dev = device;
rds_ibdev->pd = ib_alloc_pd(device); rds_ibdev->pd = ib_alloc_pd(device);
if (IS_ERR(rds_ibdev->pd)) if (IS_ERR(rds_ibdev->pd)) {
goto free_dev; rds_ibdev->pd = NULL;
goto put_dev;
}
rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd, rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd, IB_ACCESS_LOCAL_WRITE);
IB_ACCESS_LOCAL_WRITE); if (IS_ERR(rds_ibdev->mr)) {
if (IS_ERR(rds_ibdev->mr)) rds_ibdev->mr = NULL;
goto err_pd; goto put_dev;
}
rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev); rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev);
if (IS_ERR(rds_ibdev->mr_pool)) { if (IS_ERR(rds_ibdev->mr_pool)) {
rds_ibdev->mr_pool = NULL; rds_ibdev->mr_pool = NULL;
goto err_mr; goto put_dev;
} }
INIT_LIST_HEAD(&rds_ibdev->ipaddr_list); INIT_LIST_HEAD(&rds_ibdev->ipaddr_list);
INIT_LIST_HEAD(&rds_ibdev->conn_list); INIT_LIST_HEAD(&rds_ibdev->conn_list);
list_add_tail(&rds_ibdev->list, &rds_ib_devices); list_add_tail(&rds_ibdev->list, &rds_ib_devices);
atomic_inc(&rds_ibdev->refcount);
ib_set_client_data(device, &rds_ib_client, rds_ibdev); ib_set_client_data(device, &rds_ib_client, rds_ibdev);
atomic_inc(&rds_ibdev->refcount);
goto free_attr; put_dev:
rds_ib_dev_put(rds_ibdev);
err_mr:
ib_dereg_mr(rds_ibdev->mr);
err_pd:
ib_dealloc_pd(rds_ibdev->pd);
free_dev:
kfree(rds_ibdev);
free_attr: free_attr:
kfree(dev_attr); kfree(dev_attr);
} }
/*
* New connections use this to find the device to associate with the
* connection. It's not in the fast path so we're not concerned about the
* performance of the IB call. (As of this writing, it uses an interrupt
* blocking spinlock to serialize walking a per-device list of all registered
* clients.)
*
* RCU is used to handle incoming connections racing with device teardown.
* Rather than use a lock to serialize removal from the client_data and
* getting a new reference, we use an RCU grace period. The destruction
* path removes the device from client_data and then waits for all RCU
* readers to finish.
*
* A new connection can get NULL from this if its arriving on a
* device that is in the process of being removed.
*/
struct rds_ib_device *rds_ib_get_client_data(struct ib_device *device)
{
struct rds_ib_device *rds_ibdev;
rcu_read_lock();
rds_ibdev = ib_get_client_data(device, &rds_ib_client);
if (rds_ibdev)
atomic_inc(&rds_ibdev->refcount);
rcu_read_unlock();
return rds_ibdev;
}
/*
* The IB stack is letting us know that a device is going away. This can
* happen if the underlying HCA driver is removed or if PCI hotplug is removing
* the pci function, for example.
*
* This can be called at any time and can be racing with any other RDS path.
*/
void rds_ib_remove_one(struct ib_device *device) void rds_ib_remove_one(struct ib_device *device)
{ {
struct rds_ib_device *rds_ibdev; struct rds_ib_device *rds_ibdev;
struct rds_ib_ipaddr *i_ipaddr, *i_next;
rds_ibdev = ib_get_client_data(device, &rds_ib_client); rds_ibdev = ib_get_client_data(device, &rds_ib_client);
if (!rds_ibdev) if (!rds_ibdev)
return; return;
synchronize_rcu();
list_for_each_entry_safe(i_ipaddr, i_next, &rds_ibdev->ipaddr_list, list) {
list_del(&i_ipaddr->list);
kfree(i_ipaddr);
}
rds_ib_destroy_conns(rds_ibdev); rds_ib_destroy_conns(rds_ibdev);
if (rds_ibdev->mr_pool) /*
rds_ib_destroy_mr_pool(rds_ibdev->mr_pool); * prevent future connection attempts from getting a reference to this
* device and wait for currently racing connection attempts to finish
ib_dereg_mr(rds_ibdev->mr); * getting their reference
*/
while (ib_dealloc_pd(rds_ibdev->pd)) { ib_set_client_data(device, &rds_ib_client, NULL);
rdsdebug("Failed to dealloc pd %p\n", rds_ibdev->pd); synchronize_rcu();
msleep(1); rds_ib_dev_put(rds_ibdev);
}
list_del(&rds_ibdev->list); list_del(&rds_ibdev->list);
kfree(rds_ibdev); rds_ib_dev_put(rds_ibdev);
} }
struct ib_client rds_ib_client = { struct ib_client rds_ib_client = {
...@@ -190,7 +251,7 @@ static int rds_ib_conn_info_visitor(struct rds_connection *conn, ...@@ -190,7 +251,7 @@ static int rds_ib_conn_info_visitor(struct rds_connection *conn,
rdma_addr_get_sgid(dev_addr, (union ib_gid *) &iinfo->src_gid); rdma_addr_get_sgid(dev_addr, (union ib_gid *) &iinfo->src_gid);
rdma_addr_get_dgid(dev_addr, (union ib_gid *) &iinfo->dst_gid); rdma_addr_get_dgid(dev_addr, (union ib_gid *) &iinfo->dst_gid);
rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client); rds_ibdev = ic->rds_ibdev;
iinfo->max_send_wr = ic->i_send_ring.w_nr; iinfo->max_send_wr = ic->i_send_ring.w_nr;
iinfo->max_recv_wr = ic->i_recv_ring.w_nr; iinfo->max_recv_wr = ic->i_recv_ring.w_nr;
iinfo->max_send_sge = rds_ibdev->max_sge; iinfo->max_send_sge = rds_ibdev->max_sge;
......
...@@ -167,6 +167,8 @@ struct rds_ib_device { ...@@ -167,6 +167,8 @@ struct rds_ib_device {
unsigned int max_initiator_depth; unsigned int max_initiator_depth;
unsigned int max_responder_resources; unsigned int max_responder_resources;
spinlock_t spinlock; /* protect the above */ spinlock_t spinlock; /* protect the above */
atomic_t refcount;
struct work_struct free_work;
}; };
#define pcidev_to_node(pcidev) pcibus_to_node(pcidev->bus) #define pcidev_to_node(pcidev) pcibus_to_node(pcidev->bus)
...@@ -251,6 +253,8 @@ static inline void rds_ib_dma_sync_sg_for_device(struct ib_device *dev, ...@@ -251,6 +253,8 @@ static inline void rds_ib_dma_sync_sg_for_device(struct ib_device *dev,
extern struct rds_transport rds_ib_transport; extern struct rds_transport rds_ib_transport;
extern void rds_ib_add_one(struct ib_device *device); extern void rds_ib_add_one(struct ib_device *device);
extern void rds_ib_remove_one(struct ib_device *device); extern void rds_ib_remove_one(struct ib_device *device);
struct rds_ib_device *rds_ib_get_client_data(struct ib_device *device);
void rds_ib_dev_put(struct rds_ib_device *rds_ibdev);
extern struct ib_client rds_ib_client; extern struct ib_client rds_ib_client;
extern unsigned int fmr_pool_size; extern unsigned int fmr_pool_size;
......
...@@ -95,7 +95,6 @@ void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_even ...@@ -95,7 +95,6 @@ void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_even
{ {
const struct rds_ib_connect_private *dp = NULL; const struct rds_ib_connect_private *dp = NULL;
struct rds_ib_connection *ic = conn->c_transport_data; struct rds_ib_connection *ic = conn->c_transport_data;
struct rds_ib_device *rds_ibdev;
struct ib_qp_attr qp_attr; struct ib_qp_attr qp_attr;
int err; int err;
...@@ -145,12 +144,11 @@ void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_even ...@@ -145,12 +144,11 @@ void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_even
if (err) if (err)
printk(KERN_NOTICE "ib_modify_qp(IB_QP_STATE, RTS): err=%d\n", err); printk(KERN_NOTICE "ib_modify_qp(IB_QP_STATE, RTS): err=%d\n", err);
/* update ib_device with this local ipaddr & conn */ /* update ib_device with this local ipaddr */
rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client); err = rds_ib_update_ipaddr(ic->rds_ibdev, conn->c_laddr);
err = rds_ib_update_ipaddr(rds_ibdev, conn->c_laddr);
if (err) if (err)
printk(KERN_ERR "rds_ib_update_ipaddr failed (%d)\n", err); printk(KERN_ERR "rds_ib_update_ipaddr failed (%d)\n",
rds_ib_add_conn(rds_ibdev, conn); err);
/* If the peer gave us the last packet it saw, process this as if /* If the peer gave us the last packet it saw, process this as if
* we had received a regular ACK. */ * we had received a regular ACK. */
...@@ -168,12 +166,10 @@ static void rds_ib_cm_fill_conn_param(struct rds_connection *conn, ...@@ -168,12 +166,10 @@ static void rds_ib_cm_fill_conn_param(struct rds_connection *conn,
u32 max_initiator_depth) u32 max_initiator_depth)
{ {
struct rds_ib_connection *ic = conn->c_transport_data; struct rds_ib_connection *ic = conn->c_transport_data;
struct rds_ib_device *rds_ibdev; struct rds_ib_device *rds_ibdev = ic->rds_ibdev;
memset(conn_param, 0, sizeof(struct rdma_conn_param)); memset(conn_param, 0, sizeof(struct rdma_conn_param));
rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
conn_param->responder_resources = conn_param->responder_resources =
min_t(u32, rds_ibdev->max_responder_resources, max_responder_resources); min_t(u32, rds_ibdev->max_responder_resources, max_responder_resources);
conn_param->initiator_depth = conn_param->initiator_depth =
...@@ -241,18 +237,16 @@ static int rds_ib_setup_qp(struct rds_connection *conn) ...@@ -241,18 +237,16 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
struct rds_ib_device *rds_ibdev; struct rds_ib_device *rds_ibdev;
int ret; int ret;
/* rds_ib_add_one creates a rds_ib_device object per IB device, /*
* and allocates a protection domain, memory range and FMR pool * It's normal to see a null device if an incoming connection races
* for each. If that fails for any reason, it will not register * with device removal, so we don't print a warning.
* the rds_ibdev at all.
*/ */
rds_ibdev = ib_get_client_data(dev, &rds_ib_client); rds_ibdev = rds_ib_get_client_data(dev);
if (!rds_ibdev) { if (!rds_ibdev)
if (printk_ratelimit())
printk(KERN_NOTICE "RDS/IB: No client_data for device %s\n",
dev->name);
return -EOPNOTSUPP; return -EOPNOTSUPP;
}
/* add the conn now so that connection establishment has the dev */
rds_ib_add_conn(rds_ibdev, conn);
if (rds_ibdev->max_wrs < ic->i_send_ring.w_nr + 1) if (rds_ibdev->max_wrs < ic->i_send_ring.w_nr + 1)
rds_ib_ring_resize(&ic->i_send_ring, rds_ibdev->max_wrs - 1); rds_ib_ring_resize(&ic->i_send_ring, rds_ibdev->max_wrs - 1);
...@@ -371,6 +365,7 @@ static int rds_ib_setup_qp(struct rds_connection *conn) ...@@ -371,6 +365,7 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
ic->i_send_cq, ic->i_recv_cq); ic->i_send_cq, ic->i_recv_cq);
out: out:
rds_ib_dev_put(rds_ibdev);
return ret; return ret;
} }
......
...@@ -87,6 +87,7 @@ static struct rds_ib_device *rds_ib_get_device(__be32 ipaddr) ...@@ -87,6 +87,7 @@ static struct rds_ib_device *rds_ib_get_device(__be32 ipaddr)
rcu_read_lock(); rcu_read_lock();
list_for_each_entry_rcu(i_ipaddr, &rds_ibdev->ipaddr_list, list) { list_for_each_entry_rcu(i_ipaddr, &rds_ibdev->ipaddr_list, list) {
if (i_ipaddr->ipaddr == ipaddr) { if (i_ipaddr->ipaddr == ipaddr) {
atomic_inc(&rds_ibdev->refcount);
rcu_read_unlock(); rcu_read_unlock();
return rds_ibdev; return rds_ibdev;
} }
...@@ -141,8 +142,10 @@ int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr) ...@@ -141,8 +142,10 @@ int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
struct rds_ib_device *rds_ibdev_old; struct rds_ib_device *rds_ibdev_old;
rds_ibdev_old = rds_ib_get_device(ipaddr); rds_ibdev_old = rds_ib_get_device(ipaddr);
if (rds_ibdev_old) if (rds_ibdev_old) {
rds_ib_remove_ipaddr(rds_ibdev_old, ipaddr); rds_ib_remove_ipaddr(rds_ibdev_old, ipaddr);
rds_ib_dev_put(rds_ibdev_old);
}
return rds_ib_add_ipaddr(rds_ibdev, ipaddr); return rds_ib_add_ipaddr(rds_ibdev, ipaddr);
} }
...@@ -163,6 +166,7 @@ void rds_ib_add_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *con ...@@ -163,6 +166,7 @@ void rds_ib_add_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *con
spin_unlock_irq(&ib_nodev_conns_lock); spin_unlock_irq(&ib_nodev_conns_lock);
ic->rds_ibdev = rds_ibdev; ic->rds_ibdev = rds_ibdev;
atomic_inc(&rds_ibdev->refcount);
} }
void rds_ib_remove_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn) void rds_ib_remove_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn)
...@@ -182,6 +186,7 @@ void rds_ib_remove_conn(struct rds_ib_device *rds_ibdev, struct rds_connection * ...@@ -182,6 +186,7 @@ void rds_ib_remove_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *
spin_unlock(&ib_nodev_conns_lock); spin_unlock(&ib_nodev_conns_lock);
ic->rds_ibdev = NULL; ic->rds_ibdev = NULL;
rds_ib_dev_put(rds_ibdev);
} }
void __rds_ib_destroy_conns(struct list_head *list, spinlock_t *list_lock) void __rds_ib_destroy_conns(struct list_head *list, spinlock_t *list_lock)
...@@ -240,7 +245,7 @@ void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_co ...@@ -240,7 +245,7 @@ void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_co
void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *pool) void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *pool)
{ {
flush_workqueue(rds_wq); cancel_work_sync(&pool->flush_worker);
rds_ib_flush_mr_pool(pool, 1); rds_ib_flush_mr_pool(pool, 1);
WARN_ON(atomic_read(&pool->item_count)); WARN_ON(atomic_read(&pool->item_count));
WARN_ON(atomic_read(&pool->free_pinned)); WARN_ON(atomic_read(&pool->free_pinned));
...@@ -597,6 +602,8 @@ void rds_ib_free_mr(void *trans_private, int invalidate) ...@@ -597,6 +602,8 @@ void rds_ib_free_mr(void *trans_private, int invalidate)
queue_work(rds_wq, &pool->flush_worker); queue_work(rds_wq, &pool->flush_worker);
} }
} }
rds_ib_dev_put(rds_ibdev);
} }
void rds_ib_flush_mrs(void) void rds_ib_flush_mrs(void)
...@@ -640,6 +647,7 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents, ...@@ -640,6 +647,7 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
printk(KERN_WARNING "RDS/IB: map_fmr failed (errno=%d)\n", ret); printk(KERN_WARNING "RDS/IB: map_fmr failed (errno=%d)\n", ret);
ibmr->device = rds_ibdev; ibmr->device = rds_ibdev;
rds_ibdev = NULL;
out: out:
if (ret) { if (ret) {
...@@ -647,5 +655,7 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents, ...@@ -647,5 +655,7 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
rds_ib_free_mr(ibmr, 0); rds_ib_free_mr(ibmr, 0);
ibmr = ERR_PTR(ret); ibmr = ERR_PTR(ret);
} }
if (rds_ibdev)
rds_ib_dev_put(rds_ibdev);
return ibmr; return ibmr;
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment