Commit 91a50030 authored by Oleg Drokin's avatar Oleg Drokin Committed by Greg Kroah-Hartman

staging/lustre/ldlm: split client namespaces into active and inactive

The main reason behind this is ldlm_poold walks all namespaces currently
no matter if there are any locks or not. On large systems this could take
quite a bit of time, esp. since ldlm_poold is currently woken up once per
second.

Now every time a client namespace loses it's last resource it is placed
into an inactive list that is not touched by ldlm_poold as pointless.
On creation of a first resource in a namespace it is placed back into
the active list.

Intel-bug-id: https://jira.hpdd.intel.com/browse/LU-2924
Lustre-change: http://review.whamcloud.com/5624Signed-off-by: default avatarOleg Drokin <oleg.drokin@intel.com>
Reviewed-by: default avatarHiroya Nozaki <nozaki.hiroya@jp.fujitsu.com>
Reviewed-by: default avatarNiu Yawei <yawei.niu@intel.com>
Signed-off-by: default avatarPeng Tao <tao.peng@emc.com>
Signed-off-by: default avatarAndreas Dilger <andreas.dilger@intel.com>
Signed-off-by: default avatarGreg Kroah-Hartman <gregkh@linuxfoundation.org>
parent bdf43213
...@@ -1471,8 +1471,6 @@ void ldlm_namespace_free(struct ldlm_namespace *ns, ...@@ -1471,8 +1471,6 @@ void ldlm_namespace_free(struct ldlm_namespace *ns,
struct obd_import *imp, int force); struct obd_import *imp, int force);
void ldlm_namespace_register(struct ldlm_namespace *ns, ldlm_side_t client); void ldlm_namespace_register(struct ldlm_namespace *ns, ldlm_side_t client);
void ldlm_namespace_unregister(struct ldlm_namespace *ns, ldlm_side_t client); void ldlm_namespace_unregister(struct ldlm_namespace *ns, ldlm_side_t client);
void ldlm_namespace_move_locked(struct ldlm_namespace *ns, ldlm_side_t client);
struct ldlm_namespace *ldlm_namespace_first_locked(ldlm_side_t client);
void ldlm_namespace_get(struct ldlm_namespace *ns); void ldlm_namespace_get(struct ldlm_namespace *ns);
void ldlm_namespace_put(struct ldlm_namespace *ns); void ldlm_namespace_put(struct ldlm_namespace *ns);
int ldlm_proc_setup(void); int ldlm_proc_setup(void);
......
...@@ -36,23 +36,46 @@ ...@@ -36,23 +36,46 @@
#define MAX_STRING_SIZE 128 #define MAX_STRING_SIZE 128
extern atomic_t ldlm_srv_namespace_nr; extern int ldlm_srv_namespace_nr;
extern atomic_t ldlm_cli_namespace_nr; extern int ldlm_cli_namespace_nr;
extern struct mutex ldlm_srv_namespace_lock; extern struct mutex ldlm_srv_namespace_lock;
extern struct list_head ldlm_srv_namespace_list; extern struct list_head ldlm_srv_namespace_list;
extern struct mutex ldlm_cli_namespace_lock; extern struct mutex ldlm_cli_namespace_lock;
extern struct list_head ldlm_cli_namespace_list; extern struct list_head ldlm_cli_active_namespace_list;
extern struct list_head ldlm_cli_inactive_namespace_list;
static inline atomic_t *ldlm_namespace_nr(ldlm_side_t client) static inline int ldlm_namespace_nr_read(ldlm_side_t client)
{ {
return client == LDLM_NAMESPACE_SERVER ? return client == LDLM_NAMESPACE_SERVER ?
&ldlm_srv_namespace_nr : &ldlm_cli_namespace_nr; ldlm_srv_namespace_nr : ldlm_cli_namespace_nr;
}
static inline void ldlm_namespace_nr_inc(ldlm_side_t client)
{
if (client == LDLM_NAMESPACE_SERVER)
ldlm_srv_namespace_nr++;
else
ldlm_cli_namespace_nr++;
}
static inline void ldlm_namespace_nr_dec(ldlm_side_t client)
{
if (client == LDLM_NAMESPACE_SERVER)
ldlm_srv_namespace_nr--;
else
ldlm_cli_namespace_nr--;
} }
static inline struct list_head *ldlm_namespace_list(ldlm_side_t client) static inline struct list_head *ldlm_namespace_list(ldlm_side_t client)
{ {
return client == LDLM_NAMESPACE_SERVER ? return client == LDLM_NAMESPACE_SERVER ?
&ldlm_srv_namespace_list : &ldlm_cli_namespace_list; &ldlm_srv_namespace_list : &ldlm_cli_active_namespace_list;
}
static inline struct list_head *ldlm_namespace_inactive_list(ldlm_side_t client)
{
return client == LDLM_NAMESPACE_SERVER ?
&ldlm_srv_namespace_list : &ldlm_cli_inactive_namespace_list;
} }
static inline struct mutex *ldlm_namespace_lock(ldlm_side_t client) static inline struct mutex *ldlm_namespace_lock(ldlm_side_t client)
...@@ -61,6 +84,17 @@ static inline struct mutex *ldlm_namespace_lock(ldlm_side_t client) ...@@ -61,6 +84,17 @@ static inline struct mutex *ldlm_namespace_lock(ldlm_side_t client)
&ldlm_srv_namespace_lock : &ldlm_cli_namespace_lock; &ldlm_srv_namespace_lock : &ldlm_cli_namespace_lock;
} }
/* ns_bref is the number of resources in this namespace with the notable
* exception of quota namespaces which have their empty refcount at 1 */
static inline int ldlm_ns_empty(struct ldlm_namespace *ns)
{
return atomic_read(&ns->ns_bref) == 0;
}
void ldlm_namespace_move_to_active_locked(struct ldlm_namespace *, ldlm_side_t);
void ldlm_namespace_move_to_inactive_locked(struct ldlm_namespace *, ldlm_side_t);
struct ldlm_namespace *ldlm_namespace_first_locked(ldlm_side_t);
/* ldlm_request.c */ /* ldlm_request.c */
/* Cancel lru flag, it indicates we cancel aged locks. */ /* Cancel lru flag, it indicates we cancel aged locks. */
enum { enum {
......
...@@ -1039,6 +1039,7 @@ static int ldlm_pools_shrink(ldlm_side_t client, int nr, ...@@ -1039,6 +1039,7 @@ static int ldlm_pools_shrink(ldlm_side_t client, int nr,
{ {
int total = 0, cached = 0, nr_ns; int total = 0, cached = 0, nr_ns;
struct ldlm_namespace *ns; struct ldlm_namespace *ns;
struct ldlm_namespace *ns_old = NULL; /* loop detection */
void *cookie; void *cookie;
if (client == LDLM_NAMESPACE_CLIENT && nr != 0 && if (client == LDLM_NAMESPACE_CLIENT && nr != 0 &&
...@@ -1053,7 +1054,7 @@ static int ldlm_pools_shrink(ldlm_side_t client, int nr, ...@@ -1053,7 +1054,7 @@ static int ldlm_pools_shrink(ldlm_side_t client, int nr,
/* /*
* Find out how many resources we may release. * Find out how many resources we may release.
*/ */
for (nr_ns = atomic_read(ldlm_namespace_nr(client)); for (nr_ns = ldlm_namespace_nr_read(client);
nr_ns > 0; nr_ns--) nr_ns > 0; nr_ns--)
{ {
mutex_lock(ldlm_namespace_lock(client)); mutex_lock(ldlm_namespace_lock(client));
...@@ -1063,8 +1064,23 @@ static int ldlm_pools_shrink(ldlm_side_t client, int nr, ...@@ -1063,8 +1064,23 @@ static int ldlm_pools_shrink(ldlm_side_t client, int nr,
return 0; return 0;
} }
ns = ldlm_namespace_first_locked(client); ns = ldlm_namespace_first_locked(client);
if (ns == ns_old) {
mutex_unlock(ldlm_namespace_lock(client));
break;
}
if (ldlm_ns_empty(ns)) {
ldlm_namespace_move_to_inactive_locked(ns, client);
mutex_unlock(ldlm_namespace_lock(client));
continue;
}
if (ns_old == NULL)
ns_old = ns;
ldlm_namespace_get(ns); ldlm_namespace_get(ns);
ldlm_namespace_move_locked(ns, client); ldlm_namespace_move_to_active_locked(ns, client);
mutex_unlock(ldlm_namespace_lock(client)); mutex_unlock(ldlm_namespace_lock(client));
total += ldlm_pool_shrink(&ns->ns_pool, 0, gfp_mask); total += ldlm_pool_shrink(&ns->ns_pool, 0, gfp_mask);
ldlm_namespace_put(ns); ldlm_namespace_put(ns);
...@@ -1078,7 +1094,7 @@ static int ldlm_pools_shrink(ldlm_side_t client, int nr, ...@@ -1078,7 +1094,7 @@ static int ldlm_pools_shrink(ldlm_side_t client, int nr,
/* /*
* Shrink at least ldlm_namespace_nr(client) namespaces. * Shrink at least ldlm_namespace_nr(client) namespaces.
*/ */
for (nr_ns = atomic_read(ldlm_namespace_nr(client)); for (nr_ns = ldlm_namespace_nr_read(client) - nr_ns;
nr_ns > 0; nr_ns--) nr_ns > 0; nr_ns--)
{ {
int cancel, nr_locks; int cancel, nr_locks;
...@@ -1099,7 +1115,7 @@ static int ldlm_pools_shrink(ldlm_side_t client, int nr, ...@@ -1099,7 +1115,7 @@ static int ldlm_pools_shrink(ldlm_side_t client, int nr,
} }
ns = ldlm_namespace_first_locked(client); ns = ldlm_namespace_first_locked(client);
ldlm_namespace_get(ns); ldlm_namespace_get(ns);
ldlm_namespace_move_locked(ns, client); ldlm_namespace_move_to_active_locked(ns, client);
mutex_unlock(ldlm_namespace_lock(client)); mutex_unlock(ldlm_namespace_lock(client));
nr_locks = ldlm_pool_granted(&ns->ns_pool); nr_locks = ldlm_pool_granted(&ns->ns_pool);
...@@ -1132,6 +1148,7 @@ void ldlm_pools_recalc(ldlm_side_t client) ...@@ -1132,6 +1148,7 @@ void ldlm_pools_recalc(ldlm_side_t client)
{ {
__u32 nr_l = 0, nr_p = 0, l; __u32 nr_l = 0, nr_p = 0, l;
struct ldlm_namespace *ns; struct ldlm_namespace *ns;
struct ldlm_namespace *ns_old = NULL;
int nr, equal = 0; int nr, equal = 0;
/* /*
...@@ -1190,16 +1207,14 @@ void ldlm_pools_recalc(ldlm_side_t client) ...@@ -1190,16 +1207,14 @@ void ldlm_pools_recalc(ldlm_side_t client)
* for _all_ pools. * for _all_ pools.
*/ */
l = LDLM_POOL_HOST_L / l = LDLM_POOL_HOST_L /
atomic_read( ldlm_namespace_nr_read(client);
ldlm_namespace_nr(client));
} else { } else {
/* /*
* All the rest of greedy pools will have * All the rest of greedy pools will have
* all locks in equal parts. * all locks in equal parts.
*/ */
l = (LDLM_POOL_HOST_L - nr_l) / l = (LDLM_POOL_HOST_L - nr_l) /
(atomic_read( (ldlm_namespace_nr_read(client) -
ldlm_namespace_nr(client)) -
nr_p); nr_p);
} }
ldlm_pool_setup(&ns->ns_pool, l); ldlm_pool_setup(&ns->ns_pool, l);
...@@ -1210,7 +1225,7 @@ void ldlm_pools_recalc(ldlm_side_t client) ...@@ -1210,7 +1225,7 @@ void ldlm_pools_recalc(ldlm_side_t client)
/* /*
* Recalc at least ldlm_namespace_nr(client) namespaces. * Recalc at least ldlm_namespace_nr(client) namespaces.
*/ */
for (nr = atomic_read(ldlm_namespace_nr(client)); nr > 0; nr--) { for (nr = ldlm_namespace_nr_read(client); nr > 0; nr--) {
int skip; int skip;
/* /*
* Lock the list, get first @ns in the list, getref, move it * Lock the list, get first @ns in the list, getref, move it
...@@ -1226,6 +1241,30 @@ void ldlm_pools_recalc(ldlm_side_t client) ...@@ -1226,6 +1241,30 @@ void ldlm_pools_recalc(ldlm_side_t client)
} }
ns = ldlm_namespace_first_locked(client); ns = ldlm_namespace_first_locked(client);
if (ns_old == ns) { /* Full pass complete */
mutex_unlock(ldlm_namespace_lock(client));
break;
}
/* We got an empty namespace, need to move it back to inactive
* list.
* The race with parallel resource creation is fine:
* - If they do namespace_get before our check, we fail the
* check and they move this item to the end of the list anyway
* - If we do the check and then they do namespace_get, then
* we move the namespace to inactive and they will move
* it back to active (synchronised by the lock, so no clash
* there).
*/
if (ldlm_ns_empty(ns)) {
ldlm_namespace_move_to_inactive_locked(ns, client);
mutex_unlock(ldlm_namespace_lock(client));
continue;
}
if (ns_old == NULL)
ns_old = ns;
spin_lock(&ns->ns_lock); spin_lock(&ns->ns_lock);
/* /*
* skip ns which is being freed, and we don't want to increase * skip ns which is being freed, and we don't want to increase
...@@ -1239,7 +1278,7 @@ void ldlm_pools_recalc(ldlm_side_t client) ...@@ -1239,7 +1278,7 @@ void ldlm_pools_recalc(ldlm_side_t client)
} }
spin_unlock(&ns->ns_lock); spin_unlock(&ns->ns_lock);
ldlm_namespace_move_locked(ns, client); ldlm_namespace_move_to_active_locked(ns, client);
mutex_unlock(ldlm_namespace_lock(client)); mutex_unlock(ldlm_namespace_lock(client));
/* /*
......
...@@ -48,14 +48,19 @@ ...@@ -48,14 +48,19 @@
struct kmem_cache *ldlm_resource_slab, *ldlm_lock_slab; struct kmem_cache *ldlm_resource_slab, *ldlm_lock_slab;
atomic_t ldlm_srv_namespace_nr = ATOMIC_INIT(0); int ldlm_srv_namespace_nr = 0;
atomic_t ldlm_cli_namespace_nr = ATOMIC_INIT(0); int ldlm_cli_namespace_nr = 0;
struct mutex ldlm_srv_namespace_lock; struct mutex ldlm_srv_namespace_lock;
LIST_HEAD(ldlm_srv_namespace_list); LIST_HEAD(ldlm_srv_namespace_list);
struct mutex ldlm_cli_namespace_lock; struct mutex ldlm_cli_namespace_lock;
LIST_HEAD(ldlm_cli_namespace_list); /* Client Namespaces that have active resources in them.
* Once all resources go away, ldlm_poold moves such namespaces to the
* inactive list */
LIST_HEAD(ldlm_cli_active_namespace_list);
/* Client namespaces that don't have any locks in them */
LIST_HEAD(ldlm_cli_inactive_namespace_list);
proc_dir_entry_t *ldlm_type_proc_dir = NULL; proc_dir_entry_t *ldlm_type_proc_dir = NULL;
proc_dir_entry_t *ldlm_ns_proc_dir = NULL; proc_dir_entry_t *ldlm_ns_proc_dir = NULL;
...@@ -636,7 +641,7 @@ struct ldlm_namespace *ldlm_namespace_new(struct obd_device *obd, char *name, ...@@ -636,7 +641,7 @@ struct ldlm_namespace *ldlm_namespace_new(struct obd_device *obd, char *name,
GOTO(out_hash, rc); GOTO(out_hash, rc);
} }
idx = atomic_read(ldlm_namespace_nr(client)); idx = ldlm_namespace_nr_read(client);
rc = ldlm_pool_init(&ns->ns_pool, ns, idx, client); rc = ldlm_pool_init(&ns->ns_pool, ns, idx, client);
if (rc) { if (rc) {
CERROR("Can't initialize lock pool, rc %d\n", rc); CERROR("Can't initialize lock pool, rc %d\n", rc);
...@@ -953,6 +958,12 @@ void ldlm_namespace_get(struct ldlm_namespace *ns) ...@@ -953,6 +958,12 @@ void ldlm_namespace_get(struct ldlm_namespace *ns)
} }
EXPORT_SYMBOL(ldlm_namespace_get); EXPORT_SYMBOL(ldlm_namespace_get);
/* This is only for callers that care about refcount */
int ldlm_namespace_get_return(struct ldlm_namespace *ns)
{
return atomic_inc_return(&ns->ns_bref);
}
void ldlm_namespace_put(struct ldlm_namespace *ns) void ldlm_namespace_put(struct ldlm_namespace *ns)
{ {
if (atomic_dec_and_lock(&ns->ns_bref, &ns->ns_lock)) { if (atomic_dec_and_lock(&ns->ns_bref, &ns->ns_lock)) {
...@@ -967,8 +978,8 @@ void ldlm_namespace_register(struct ldlm_namespace *ns, ldlm_side_t client) ...@@ -967,8 +978,8 @@ void ldlm_namespace_register(struct ldlm_namespace *ns, ldlm_side_t client)
{ {
mutex_lock(ldlm_namespace_lock(client)); mutex_lock(ldlm_namespace_lock(client));
LASSERT(list_empty(&ns->ns_list_chain)); LASSERT(list_empty(&ns->ns_list_chain));
list_add(&ns->ns_list_chain, ldlm_namespace_list(client)); list_add(&ns->ns_list_chain, ldlm_namespace_inactive_list(client));
atomic_inc(ldlm_namespace_nr(client)); ldlm_namespace_nr_inc(client);
mutex_unlock(ldlm_namespace_lock(client)); mutex_unlock(ldlm_namespace_lock(client));
} }
...@@ -981,18 +992,29 @@ void ldlm_namespace_unregister(struct ldlm_namespace *ns, ldlm_side_t client) ...@@ -981,18 +992,29 @@ void ldlm_namespace_unregister(struct ldlm_namespace *ns, ldlm_side_t client)
* using list_empty(&ns->ns_list_chain). This is why it is * using list_empty(&ns->ns_list_chain). This is why it is
* important to use list_del_init() here. */ * important to use list_del_init() here. */
list_del_init(&ns->ns_list_chain); list_del_init(&ns->ns_list_chain);
atomic_dec(ldlm_namespace_nr(client)); ldlm_namespace_nr_dec(client);
mutex_unlock(ldlm_namespace_lock(client)); mutex_unlock(ldlm_namespace_lock(client));
} }
/** Should be called with ldlm_namespace_lock(client) taken. */ /** Should be called with ldlm_namespace_lock(client) taken. */
void ldlm_namespace_move_locked(struct ldlm_namespace *ns, ldlm_side_t client) void ldlm_namespace_move_to_active_locked(struct ldlm_namespace *ns,
ldlm_side_t client)
{ {
LASSERT(!list_empty(&ns->ns_list_chain)); LASSERT(!list_empty(&ns->ns_list_chain));
LASSERT(mutex_is_locked(ldlm_namespace_lock(client))); LASSERT(mutex_is_locked(ldlm_namespace_lock(client)));
list_move_tail(&ns->ns_list_chain, ldlm_namespace_list(client)); list_move_tail(&ns->ns_list_chain, ldlm_namespace_list(client));
} }
/** Should be called with ldlm_namespace_lock(client) taken. */
void ldlm_namespace_move_to_inactive_locked(struct ldlm_namespace *ns,
ldlm_side_t client)
{
LASSERT(!list_empty(&ns->ns_list_chain));
LASSERT(mutex_is_locked(ldlm_namespace_lock(client)));
list_move_tail(&ns->ns_list_chain,
ldlm_namespace_inactive_list(client));
}
/** Should be called with ldlm_namespace_lock(client) taken. */ /** Should be called with ldlm_namespace_lock(client) taken. */
struct ldlm_namespace *ldlm_namespace_first_locked(ldlm_side_t client) struct ldlm_namespace *ldlm_namespace_first_locked(ldlm_side_t client)
{ {
...@@ -1049,6 +1071,7 @@ ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent, ...@@ -1049,6 +1071,7 @@ ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent,
struct ldlm_resource *res; struct ldlm_resource *res;
cfs_hash_bd_t bd; cfs_hash_bd_t bd;
__u64 version; __u64 version;
int ns_refcount = 0;
LASSERT(ns != NULL); LASSERT(ns != NULL);
LASSERT(parent == NULL); LASSERT(parent == NULL);
...@@ -1119,7 +1142,7 @@ ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent, ...@@ -1119,7 +1142,7 @@ ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent,
/* We won! Let's add the resource. */ /* We won! Let's add the resource. */
cfs_hash_bd_add_locked(ns->ns_rs_hash, &bd, &res->lr_hash); cfs_hash_bd_add_locked(ns->ns_rs_hash, &bd, &res->lr_hash);
if (cfs_hash_bd_count_get(&bd) == 1) if (cfs_hash_bd_count_get(&bd) == 1)
ldlm_namespace_get(ns); ns_refcount = ldlm_namespace_get_return(ns);
cfs_hash_bd_unlock(ns->ns_rs_hash, &bd, 1); cfs_hash_bd_unlock(ns->ns_rs_hash, &bd, 1);
if (ns->ns_lvbo && ns->ns_lvbo->lvbo_init) { if (ns->ns_lvbo && ns->ns_lvbo->lvbo_init) {
...@@ -1145,6 +1168,20 @@ ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent, ...@@ -1145,6 +1168,20 @@ ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent,
/* We create resource with locked lr_lvb_mutex. */ /* We create resource with locked lr_lvb_mutex. */
mutex_unlock(&res->lr_lvb_mutex); mutex_unlock(&res->lr_lvb_mutex);
/* Let's see if we happened to be the very first resource in this
* namespace. If so, and this is a client namespace, we need to move
* the namespace into the active namespaces list to be patrolled by
* the ldlm_poold.
* A notable exception, for quota namespaces qsd_lib.c already took a
* namespace reference, so it won't be participating in all of this,
* but I guess that's ok since we have no business cancelling quota
* locks anyway */
if (ns_is_client(ns) && ns_refcount == 1) {
mutex_lock(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
ldlm_namespace_move_to_active_locked(ns, LDLM_NAMESPACE_CLIENT);
mutex_unlock(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
}
return res; return res;
} }
EXPORT_SYMBOL(ldlm_resource_get); EXPORT_SYMBOL(ldlm_resource_get);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment