Commit 5231f765 authored by Lai Siyao's avatar Lai Siyao Committed by Greg Kroah-Hartman

staging: lustre: statahead: small fixes and cleanup

small fixes:
 * when 'unplug' is set for ll_statahead(), sa_put() shouldn't kill
   the entry found, because its inflight RPC may not finish yet.
 * remove 'sai_generation', add 'lli_sa_generation' because the
   former one is not safe to access without lock.
 * revalidate_statahead_dentry() may fail to wait for statahead
   entry to become ready, in this case it should not release this
   entry, because it may be used by inflight statahead RPC.

cleanups:
 * rename ll_statahead_enter() to ll_statahead().
 * move dentry 'lld_sa_generation' update to ll_statahead() to
   simplify code and logic.
 * other small cleanups.
Signed-off-by: default avatarLai Siyao <lai.siyao@intel.com>
Intel-bug-id: https://jira.hpdd.intel.com/browse/LU-3270
Reviewed-on: http://review.whamcloud.com/9667
Intel-bug-id: https://jira.hpdd.intel.com/browse/LU-6222
Reviewed-on: http://review.whamcloud.com/13708Reviewed-by: default avatarFan Yong <fan.yong@intel.com>
Reviewed-by: default avatarBobi Jam <bobijam@hotmail.com>
Reviewed-by: default avatarJames Simmons <uja.ornl@gmail.com>
Reviewed-by: default avatarOleg Drokin <oleg.drokin@intel.com>
Signed-off-by: default avatarJames Simmons <jsimmons@infradead.org>
Signed-off-by: default avatarGreg Kroah-Hartman <gregkh@linuxfoundation.org>
parent 9ca6fb12
......@@ -278,14 +278,13 @@ static int ll_revalidate_dentry(struct dentry *dentry,
if (lookup_flags & (LOOKUP_PARENT | LOOKUP_OPEN | LOOKUP_CREATE))
return 1;
if (!dentry_need_statahead(dir, dentry))
if (!dentry_may_statahead(dir, dentry))
return 1;
if (lookup_flags & LOOKUP_RCU)
return -ECHILD;
do_statahead_enter(dir, &dentry, !d_inode(dentry));
ll_statahead_mark(dir, dentry);
ll_statahead(dir, &dentry, !d_inode(dentry));
return 1;
}
......
......@@ -161,7 +161,7 @@ struct ll_inode_info {
/* for directory */
struct {
/* serialize normal readdir and statahead-readdir. */
struct mutex d_readdir_mutex;
struct mutex lli_readdir_mutex;
/* metadata statahead */
/* since parent-child threads can share the same @file
......@@ -169,44 +169,35 @@ struct ll_inode_info {
* case of parent exit before child -- it is me should
* cleanup the dir readahead.
*/
void *d_opendir_key;
struct ll_statahead_info *d_sai;
void *lli_opendir_key;
struct ll_statahead_info *lli_sai;
/* protect statahead stuff. */
spinlock_t d_sa_lock;
spinlock_t lli_sa_lock;
/* "opendir_pid" is the token when lookup/revalidate
* -- I am the owner of dir statahead.
*/
pid_t d_opendir_pid;
pid_t lli_opendir_pid;
/* stat will try to access statahead entries or start
* statahead if this flag is set, and this flag will be
* set upon dir open, and cleared when dir is closed,
* statahead hit ratio is too low, or start statahead
* thread failed.
*/
unsigned int d_sa_enabled:1;
unsigned int lli_sa_enabled:1;
/* generation for statahead */
unsigned int lli_sa_generation;
/* directory stripe information */
struct lmv_stripe_md *d_lsm_md;
struct lmv_stripe_md *lli_lsm_md;
/* striped directory size */
loff_t d_stripe_size;
/* striped directory nlink */
__u64 d_stripe_nlink;
} d;
#define lli_readdir_mutex u.d.d_readdir_mutex
#define lli_opendir_key u.d.d_opendir_key
#define lli_sai u.d.d_sai
#define lli_sa_lock u.d.d_sa_lock
#define lli_sa_enabled u.d.d_sa_enabled
#define lli_opendir_pid u.d.d_opendir_pid
#define lli_lsm_md u.d.d_lsm_md
#define lli_stripe_dir_size u.d.d_stripe_size
#define lli_stripe_dir_nlink u.d.d_stripe_nlink
loff_t lli_stripe_dir_size;
u64 lli_stripe_dir_nlink;
};
/* for non-directory */
struct {
struct mutex f_size_mutex;
char *f_symlink_name;
__u64 f_maxbytes;
struct mutex lli_size_mutex;
char *lli_symlink_name;
__u64 lli_maxbytes;
/*
* struct rw_semaphore {
* signed long count; // align d.d_def_acl
......@@ -214,16 +205,16 @@ struct ll_inode_info {
* struct list_head wait_list;
* }
*/
struct rw_semaphore f_trunc_sem;
struct range_lock_tree f_write_tree;
struct rw_semaphore lli_trunc_sem;
struct range_lock_tree lli_write_tree;
struct rw_semaphore f_glimpse_sem;
unsigned long f_glimpse_time;
struct list_head f_agl_list;
__u64 f_agl_index;
struct rw_semaphore lli_glimpse_sem;
unsigned long lli_glimpse_time;
struct list_head lli_agl_list;
__u64 lli_agl_index;
/* for writepage() only to communicate to fsync */
int f_async_rc;
int lli_async_rc;
/*
* whenever a process try to read/write the file, the
......@@ -233,22 +224,9 @@ struct ll_inode_info {
* so the read/write statistics for jobid will not be
* accurate if the file is shared by different jobs.
*/
char f_jobid[LUSTRE_JOBID_SIZE];
} f;
#define lli_size_mutex u.f.f_size_mutex
#define lli_symlink_name u.f.f_symlink_name
#define lli_maxbytes u.f.f_maxbytes
#define lli_trunc_sem u.f.f_trunc_sem
#define lli_write_tree u.f.f_write_tree
#define lli_glimpse_sem u.f.f_glimpse_sem
#define lli_glimpse_time u.f.f_glimpse_time
#define lli_agl_list u.f.f_agl_list
#define lli_agl_index u.f.f_agl_index
#define lli_async_rc u.f.f_async_rc
#define lli_jobid u.f.f_jobid
} u;
char lli_jobid[LUSTRE_JOBID_SIZE];
};
};
/* XXX: For following frequent used members, although they maybe special
* used for non-directory object, it is some time-wasting to check
......@@ -1095,11 +1073,10 @@ void ll_ra_stats_inc(struct inode *inode, enum ra_stat which);
/* per inode struct, for dir only */
struct ll_statahead_info {
struct inode *sai_inode;
struct dentry *sai_dentry;
atomic_t sai_refcount; /* when access this struct, hold
* refcount
*/
unsigned int sai_generation; /* generation for statahead */
unsigned int sai_max; /* max ahead of lookup */
__u64 sai_sent; /* stat requests sent count */
__u64 sai_replied; /* stat requests which received
......@@ -1142,8 +1119,7 @@ struct ll_statahead_info {
atomic_t sai_cache_count; /* entry count in cache */
};
int do_statahead_enter(struct inode *dir, struct dentry **dentry,
int only_unplug);
int ll_statahead(struct inode *dir, struct dentry **dentry, bool unplug);
void ll_authorize_statahead(struct inode *dir, void *key);
void ll_deauthorize_statahead(struct inode *dir, void *key);
......@@ -1175,24 +1151,12 @@ static inline int ll_glimpse_size(struct inode *inode)
return rc;
}
static inline void
ll_statahead_mark(struct inode *dir, struct dentry *dentry)
{
struct ll_inode_info *lli = ll_i2info(dir);
struct ll_statahead_info *sai = lli->lli_sai;
struct ll_dentry_data *ldd = ll_d2d(dentry);
/* not the same process, don't mark */
if (lli->lli_opendir_pid != current_pid())
return;
LASSERT(ldd);
if (sai)
ldd->lld_sa_generation = sai->sai_generation;
}
/*
* dentry may statahead when statahead is enabled and current process has opened
* parent directory, and this dentry hasn't accessed statahead cache before
*/
static inline bool
dentry_need_statahead(struct inode *dir, struct dentry *dentry)
dentry_may_statahead(struct inode *dir, struct dentry *dentry)
{
struct ll_inode_info *lli;
struct ll_dentry_data *ldd;
......@@ -1215,38 +1179,27 @@ dentry_need_statahead(struct inode *dir, struct dentry *dentry)
if (lli->lli_opendir_pid != current_pid())
return false;
ldd = ll_d2d(dentry);
/*
* When stats a dentry, the system trigger more than once "revalidate"
* or "lookup", for "getattr", for "getxattr", and maybe for others.
* Under patchless client mode, the operation intent is not accurate,
* which maybe misguide the statahead thread. For example:
* The "revalidate" call for "getattr" and "getxattr" of a dentry maybe
* have the same operation intent -- "IT_GETATTR".
* In fact, one dentry should has only one chance to interact with the
* statahead thread, otherwise the statahead windows will be confused.
* When stating a dentry, kernel may trigger 'revalidate' or 'lookup'
* multiple times, eg. for 'getattr', 'getxattr' and etc.
* For patchless client, lookup intent is not accurate, which may
* misguide statahead. For example:
* The 'revalidate' call for 'getattr' and 'getxattr' of a dentry will
* have the same intent -- IT_GETATTR, while one dentry should access
* statahead cache once, otherwise statahead windows is messed up.
* The solution is as following:
* Assign "lld_sa_generation" with "sai_generation" when a dentry
* "IT_GETATTR" for the first time, and the subsequent "IT_GETATTR"
* will bypass interacting with statahead thread for checking:
* "lld_sa_generation == lli_sai->sai_generation"
* Assign 'lld_sa_generation' with 'lli_sa_generation' when a dentry
* IT_GETATTR for the first time, and subsequent IT_GETATTR will
* bypass interacting with statahead cache by checking
* 'lld_sa_generation == lli->lli_sa_generation'.
*/
if (ldd && lli->lli_sai &&
ldd->lld_sa_generation == lli->lli_sai->sai_generation)
ldd = ll_d2d(dentry);
if (ldd && ldd->lld_sa_generation == lli->lli_sa_generation)
return false;
return true;
}
static inline int
ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int only_unplug)
{
if (!dentry_need_statahead(dir, *dentryp))
return -EAGAIN;
return do_statahead_enter(dir, dentryp, only_unplug);
}
/* llite ioctl register support routine */
enum llioc_iter {
LLIOC_CONT = 0,
......
......@@ -522,8 +522,8 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry,
if (!it || it->it_op == IT_GETXATTR)
it = &lookup_it;
if (it->it_op == IT_GETATTR) {
rc = ll_statahead_enter(parent, &dentry, 0);
if (it->it_op == IT_GETATTR && dentry_may_statahead(parent, dentry)) {
rc = ll_statahead(parent, &dentry, 0);
if (rc == 1) {
if (dentry == save)
retval = NULL;
......@@ -574,11 +574,8 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry,
retval = NULL;
else
retval = dentry;
out:
if (req)
out:
ptlrpc_req_finished(req);
if (it->it_op == IT_GETATTR && (!retval || retval == dentry))
ll_statahead_mark(parent, dentry);
return retval;
}
......
......@@ -54,12 +54,12 @@ enum se_stat {
/*
* sa_entry is not refcounted: statahead thread allocates it and do async stat,
* and in async stat callback ll_statahead_interpret() will add it into
* sai_cb_entries, later statahead thread will call sa_handle_callback() to
* sai_interim_entries, later statahead thread will call sa_handle_callback() to
* instantiate entry and move it into sai_entries, and then only scanner process
* can access and free it.
*/
struct sa_entry {
/* link into sai_cb_entries or sai_entries */
/* link into sai_interim_entries or sai_entries */
struct list_head se_list;
/* link into sai hash table locally */
struct list_head se_hash;
......@@ -84,23 +84,20 @@ struct sa_entry {
static unsigned int sai_generation;
static DEFINE_SPINLOCK(sai_generation_lock);
/*
* The entry only can be released by the caller, it is necessary to hold lock.
*/
/* sa_entry is ready to use */
static inline int sa_ready(struct sa_entry *entry)
{
smp_rmb();
return (entry->se_state != SA_ENTRY_INIT);
}
/* hash value to put in sai_cache */
static inline int sa_hash(int val)
{
return val & LL_SA_CACHE_MASK;
}
/*
* Insert entry to hash SA table.
*/
/* hash entry into sai_cache */
static inline void
sa_rehash(struct ll_statahead_info *sai, struct sa_entry *entry)
{
......@@ -130,11 +127,13 @@ static inline int agl_should_run(struct ll_statahead_info *sai,
return (inode && S_ISREG(inode->i_mode) && sai->sai_agl_valid);
}
/* statahead window is full */
static inline int sa_sent_full(struct ll_statahead_info *sai)
{
return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
}
/* got async stat replies */
static inline int sa_has_callback(struct ll_statahead_info *sai)
{
return !list_empty(&sai->sai_interim_entries);
......@@ -158,7 +157,7 @@ static inline int sa_low_hit(struct ll_statahead_info *sai)
}
/*
* If the given index is behind of statahead window more than
* if the given index is behind of statahead window more than
* SA_OMITTED_ENTRY_MAX, then it is old.
*/
static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
......@@ -167,9 +166,7 @@ static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
sai->sai_index);
}
/*
* Insert it into sai_entries tail when init.
*/
/* allocate sa_entry and hash it to allow scanner process to find it */
static struct sa_entry *
sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
const char *name, int len)
......@@ -198,7 +195,7 @@ sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
entry->se_qstr.len = len;
entry->se_qstr.name = dname;
lli = ll_i2info(sai->sai_inode);
lli = ll_i2info(sai->sai_dentry->d_inode);
spin_lock(&lli->lli_sa_lock);
INIT_LIST_HEAD(&entry->se_list);
sa_rehash(sai, entry);
......@@ -246,7 +243,7 @@ sa_get(struct ll_statahead_info *sai, const struct qstr *qstr)
static inline void
sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry)
{
struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
LASSERT(!list_empty(&entry->se_hash));
LASSERT(!list_empty(&entry->se_list));
......@@ -271,7 +268,7 @@ sa_put(struct ll_statahead_info *sai, struct sa_entry *entry)
struct sa_entry *tmp, *next;
if (entry && entry->se_state == SA_ENTRY_SUCC) {
struct ll_sb_info *sbi = ll_i2sbi(sai->sai_inode);
struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
sai->sai_hit++;
sai->sai_consecutive_miss = 0;
......@@ -293,6 +290,7 @@ sa_put(struct ll_statahead_info *sai, struct sa_entry *entry)
break;
sa_kill(sai, tmp);
}
wake_up(&sai->sai_thread.t_ctl_waitq);
}
......@@ -329,7 +327,7 @@ __sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
static void
sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
{
struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
struct md_enqueue_info *minfo = entry->se_minfo;
struct ptlrpc_request *req = entry->se_req;
bool wakeup;
......@@ -355,14 +353,12 @@ sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
wake_up(&sai->sai_waitq);
}
/*
* Insert inode into the list of sai_agls.
*/
/* Insert inode into the list of sai_agls. */
static void ll_agl_add(struct ll_statahead_info *sai,
struct inode *inode, int index)
{
struct ll_inode_info *child = ll_i2info(inode);
struct ll_inode_info *parent = ll_i2info(sai->sai_inode);
struct ll_inode_info *parent = ll_i2info(sai->sai_dentry->d_inode);
int added = 0;
spin_lock(&child->lli_agl_lock);
......@@ -387,8 +383,9 @@ static void ll_agl_add(struct ll_statahead_info *sai,
}
/* allocate sai */
static struct ll_statahead_info *ll_sai_alloc(void)
static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
{
struct ll_inode_info *lli = ll_i2info(dentry->d_inode);
struct ll_statahead_info *sai;
int i;
......@@ -396,14 +393,9 @@ static struct ll_statahead_info *ll_sai_alloc(void)
if (!sai)
return NULL;
sai->sai_dentry = dget(dentry);
atomic_set(&sai->sai_refcount, 1);
spin_lock(&sai_generation_lock);
sai->sai_generation = ++sai_generation;
if (unlikely(sai_generation == 0))
sai->sai_generation = ++sai_generation;
spin_unlock(&sai_generation_lock);
sai->sai_max = LL_SA_RPC_MIN;
sai->sai_index = 1;
init_waitqueue_head(&sai->sai_waitq);
......@@ -420,9 +412,27 @@ static struct ll_statahead_info *ll_sai_alloc(void)
}
atomic_set(&sai->sai_cache_count, 0);
spin_lock(&sai_generation_lock);
lli->lli_sa_generation = ++sai_generation;
if (unlikely(!sai_generation))
lli->lli_sa_generation = ++sai_generation;
spin_unlock(&sai_generation_lock);
return sai;
}
/* free sai */
static inline void ll_sai_free(struct ll_statahead_info *sai)
{
LASSERT(sai->sai_dentry);
dput(sai->sai_dentry);
kfree(sai);
}
/*
* take refcount of sai if sai for @dir exists, which means statahead is on for
* this directory.
*/
static inline struct ll_statahead_info *ll_sai_get(struct inode *dir)
{
struct ll_inode_info *lli = ll_i2info(dir);
......@@ -437,12 +447,16 @@ static inline struct ll_statahead_info *ll_sai_get(struct inode *dir)
return sai;
}
/*
* put sai refcount after use, if refcount reaches zero, free sai and sa_entries
* attached to it.
*/
static void ll_sai_put(struct ll_statahead_info *sai)
{
struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
struct ll_sb_info *sbi = ll_i2sbi(sai->sai_inode);
struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
struct sa_entry *entry, *next;
lli->lli_sai = NULL;
......@@ -460,8 +474,7 @@ static void ll_sai_put(struct ll_statahead_info *sai)
LASSERT(atomic_read(&sai->sai_cache_count) == 0);
LASSERT(list_empty(&sai->sai_agls));
iput(sai->sai_inode);
kfree(sai);
ll_sai_free(sai);
atomic_dec(&sbi->ll_sa_running);
}
}
......@@ -533,7 +546,7 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
static void sa_instantiate(struct ll_statahead_info *sai,
struct sa_entry *entry)
{
struct inode *dir = sai->sai_inode;
struct inode *dir = sai->sai_dentry->d_inode;
struct inode *child;
struct md_enqueue_info *minfo;
struct lookup_intent *it;
......@@ -609,12 +622,12 @@ static void sa_instantiate(struct ll_statahead_info *sai,
sa_make_ready(sai, entry, rc);
}
/* once there are async stat replies, instantiate sa_entry */
/* once there are async stat replies, instantiate sa_entry from replies */
static void sa_handle_callback(struct ll_statahead_info *sai)
{
struct ll_inode_info *lli;
lli = ll_i2info(sai->sai_inode);
lli = ll_i2info(sai->sai_dentry->d_inode);
while (sa_has_callback(sai)) {
struct sa_entry *entry;
......@@ -631,21 +644,6 @@ static void sa_handle_callback(struct ll_statahead_info *sai)
sa_instantiate(sai, entry);
}
spin_lock(&lli->lli_agl_lock);
while (!agl_list_empty(sai)) {
struct ll_inode_info *clli;
clli = list_entry(sai->sai_agls.next,
struct ll_inode_info, lli_agl_list);
list_del_init(&clli->lli_agl_list);
spin_unlock(&lli->lli_agl_lock);
ll_agl_trigger(&clli->lli_vfs_inode, sai);
spin_lock(&lli->lli_agl_lock);
}
spin_unlock(&lli->lli_agl_lock);
}
/*
......@@ -718,6 +716,7 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
return rc;
}
/* finish async stat RPC arguments */
static void sa_fini_data(struct md_enqueue_info *minfo,
struct ldlm_enqueue_info *einfo)
{
......@@ -775,6 +774,7 @@ static int sa_prep_data(struct inode *dir, struct inode *child,
return 0;
}
/* async stat for file not found in dcache */
static int sa_lookup(struct inode *dir, struct sa_entry *entry)
{
struct md_enqueue_info *minfo;
......@@ -786,17 +786,18 @@ static int sa_lookup(struct inode *dir, struct sa_entry *entry)
return rc;
rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
if (rc < 0)
if (rc)
sa_fini_data(minfo, einfo);
return rc;
}
/**
* similar to ll_revalidate_it().
* \retval 1 -- dentry valid
* \retval 0 -- will send stat-ahead request
* \retval others -- prepare stat-ahead request failed
* async stat for file found in dcache, similar to .revalidate
*
* \retval 1 dentry valid, no RPC sent
* \retval 0 dentry invalid, will send async stat RPC
* \retval negative number upon error
*/
static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
struct dentry *dentry)
......@@ -831,7 +832,7 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
}
rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
if (rc < 0) {
if (rc) {
entry->se_inode = NULL;
iput(inode);
sa_fini_data(minfo, einfo);
......@@ -840,6 +841,7 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
return rc;
}
/* async stat for file with @name */
static void sa_statahead(struct dentry *parent, const char *name, int len)
{
struct inode *dir = d_inode(parent);
......@@ -873,6 +875,7 @@ static void sa_statahead(struct dentry *parent, const char *name, int len)
sai->sai_index++;
}
/* async glimpse (agl) thread main function */
static int ll_agl_thread(void *arg)
{
struct dentry *parent = arg;
......@@ -946,6 +949,7 @@ static int ll_agl_thread(void *arg)
return 0;
}
/* start agl thread */
static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
{
struct ptlrpc_thread *thread = &sai->sai_agl_thread;
......@@ -970,6 +974,7 @@ static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
&lwi);
}
/* statahead thread main function */
static int ll_statahead_thread(void *arg)
{
struct dentry *parent = arg;
......@@ -977,7 +982,7 @@ static int ll_statahead_thread(void *arg)
struct ll_inode_info *lli = ll_i2info(dir);
struct ll_sb_info *sbi = ll_i2sbi(dir);
struct ll_statahead_info *sai;
struct ptlrpc_thread *thread;
struct ptlrpc_thread *sa_thread;
struct ptlrpc_thread *agl_thread;
struct page *page = NULL;
__u64 pos = 0;
......@@ -987,9 +992,9 @@ static int ll_statahead_thread(void *arg)
struct l_wait_info lwi = { 0 };
sai = ll_sai_get(dir);
thread = &sai->sai_thread;
sa_thread = &sai->sai_thread;
agl_thread = &sai->sai_agl_thread;
thread->t_pid = current_pid();
sa_thread->t_pid = current_pid();
CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
sai, parent);
......@@ -1007,16 +1012,16 @@ static int ll_statahead_thread(void *arg)
atomic_inc(&sbi->ll_sa_total);
spin_lock(&lli->lli_sa_lock);
if (thread_is_init(thread))
if (thread_is_init(sa_thread))
/* If someone else has changed the thread state
* (e.g. already changed to SVC_STOPPING), we can't just
* blindly overwrite that setting.
*/
thread_set_flags(thread, SVC_RUNNING);
thread_set_flags(sa_thread, SVC_RUNNING);
spin_unlock(&lli->lli_sa_lock);
wake_up(&thread->t_ctl_waitq);
wake_up(&sa_thread->t_ctl_waitq);
while (pos != MDS_DIR_END_OFF && thread_is_running(thread)) {
while (pos != MDS_DIR_END_OFF && thread_is_running(sa_thread)) {
struct lu_dirpage *dp;
struct lu_dirent *ent;
......@@ -1033,7 +1038,7 @@ static int ll_statahead_thread(void *arg)
dp = page_address(page);
for (ent = lu_dirent_start(dp);
ent && thread_is_running(thread) && !sa_low_hit(sai);
ent && thread_is_running(sa_thread) && !sa_low_hit(sai);
ent = lu_dirent_next(ent)) {
__u64 hash;
int namelen;
......@@ -1082,15 +1087,32 @@ static int ll_statahead_thread(void *arg)
/* wait for spare statahead window */
do {
l_wait_event(thread->t_ctl_waitq,
l_wait_event(sa_thread->t_ctl_waitq,
!sa_sent_full(sai) ||
sa_has_callback(sai) ||
!list_empty(&sai->sai_agls) ||
!thread_is_running(thread),
!thread_is_running(sa_thread),
&lwi);
sa_handle_callback(sai);
spin_lock(&lli->lli_agl_lock);
while (sa_sent_full(sai) &&
!agl_list_empty(sai)) {
struct ll_inode_info *clli;
clli = list_entry(sai->sai_agls.next,
struct ll_inode_info, lli_agl_list);
list_del_init(&clli->lli_agl_list);
spin_unlock(&lli->lli_agl_lock);
ll_agl_trigger(&clli->lli_vfs_inode,
sai);
spin_lock(&lli->lli_agl_lock);
}
spin_unlock(&lli->lli_agl_lock);
} while (sa_sent_full(sai) &&
thread_is_running(thread));
thread_is_running(sa_thread));
sa_statahead(parent, name, namelen);
}
......@@ -1113,7 +1135,7 @@ static int ll_statahead_thread(void *arg)
if (rc < 0) {
spin_lock(&lli->lli_sa_lock);
thread_set_flags(thread, SVC_STOPPING);
thread_set_flags(sa_thread, SVC_STOPPING);
lli->lli_sa_enabled = 0;
spin_unlock(&lli->lli_sa_lock);
}
......@@ -1122,11 +1144,11 @@ static int ll_statahead_thread(void *arg)
* statahead is finished, but statahead entries need to be cached, wait
* for file release to stop me.
*/
while (thread_is_running(thread)) {
l_wait_event(thread->t_ctl_waitq,
while (thread_is_running(sa_thread)) {
l_wait_event(sa_thread->t_ctl_waitq,
sa_has_callback(sai) ||
!agl_list_empty(sai) ||
!thread_is_running(thread),
!thread_is_running(sa_thread),
&lwi);
sa_handle_callback(sai);
......@@ -1156,7 +1178,7 @@ static int ll_statahead_thread(void *arg)
/* in case we're not woken up, timeout wait */
lwi = LWI_TIMEOUT(msecs_to_jiffies(MSEC_PER_SEC >> 3),
NULL, NULL);
l_wait_event(thread->t_ctl_waitq,
l_wait_event(sa_thread->t_ctl_waitq,
sai->sai_sent == sai->sai_replied, &lwi);
}
......@@ -1164,19 +1186,20 @@ static int ll_statahead_thread(void *arg)
sa_handle_callback(sai);
spin_lock(&lli->lli_sa_lock);
thread_set_flags(thread, SVC_STOPPED);
thread_set_flags(sa_thread, SVC_STOPPED);
spin_unlock(&lli->lli_sa_lock);
wake_up(&sai->sai_waitq);
wake_up(&thread->t_ctl_waitq);
ll_sai_put(sai);
CDEBUG(D_READA, "statahead thread stopped: sai %p, parent %pd\n",
sai, parent);
dput(parent);
wake_up(&sai->sai_waitq);
wake_up(&sa_thread->t_ctl_waitq);
ll_sai_put(sai);
return rc;
}
/* authorize opened dir handle @key to statahead later */
/* authorize opened dir handle @key to statahead */
void ll_authorize_statahead(struct inode *dir, void *key)
{
struct ll_inode_info *lli = ll_i2info(dir);
......@@ -1230,7 +1253,7 @@ enum {
/**
* not first dirent, or is "."
*/
LS_NONE_FIRST_DE = 0,
LS_NOT_FIRST_DE = 0,
/**
* the first non-hidden dirent
*/
......@@ -1241,6 +1264,7 @@ enum {
LS_FIRST_DOT_DE
};
/* file is first dirent under @dir */
static int is_first_dirent(struct inode *dir, struct dentry *dentry)
{
const struct qstr *target = &dentry->d_name;
......@@ -1248,7 +1272,7 @@ static int is_first_dirent(struct inode *dir, struct dentry *dentry)
struct page *page;
__u64 pos = 0;
int dot_de;
int rc = LS_NONE_FIRST_DE;
int rc = LS_NOT_FIRST_DE;
op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
LUSTRE_OPC_ANY, dir);
......@@ -1324,7 +1348,7 @@ static int is_first_dirent(struct inode *dir, struct dentry *dentry)
if (target->len != namelen ||
memcmp(target->name, name, namelen) != 0)
rc = LS_NONE_FIRST_DE;
rc = LS_NOT_FIRST_DE;
else if (!dot_de)
rc = LS_FIRST_DE;
else
......@@ -1356,13 +1380,27 @@ static int is_first_dirent(struct inode *dir, struct dentry *dentry)
return rc;
}
/**
* revalidate @dentryp from statahead cache
*
* \param[in] dir parent directory
* \param[in] sai sai structure
* \param[out] dentryp pointer to dentry which will be revalidated
* \param[in] unplug unplug statahead window only (normally for negative
* dentry)
* \retval 1 on success, dentry is saved in @dentryp
* \retval 0 if revalidation failed (no proper lock on client)
* \retval negative number upon error
*/
static int revalidate_statahead_dentry(struct inode *dir,
struct ll_statahead_info *sai,
struct dentry **dentryp,
int only_unplug)
bool unplug)
{
struct sa_entry *entry = NULL;
struct l_wait_info lwi = { 0 };
struct ll_dentry_data *ldd;
struct ll_inode_info *lli;
int rc = 0;
if ((*dentryp)->d_name.name[0] == '.') {
......@@ -1392,10 +1430,15 @@ static int revalidate_statahead_dentry(struct inode *dir,
}
}
if (unplug) {
rc = 1;
goto out_unplug;
}
entry = sa_get(sai, &(*dentryp)->d_name);
if (!entry || only_unplug) {
sa_put(sai, entry);
return entry ? 1 : -EAGAIN;
if (!entry) {
rc = -EAGAIN;
goto out_unplug;
}
/* if statahead is busy in readdir, help it do post-work */
......@@ -1406,13 +1449,15 @@ static int revalidate_statahead_dentry(struct inode *dir,
sai->sai_index_wait = entry->se_index;
lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(30), NULL,
LWI_ON_SIGNAL_NOOP, NULL);
rc = l_wait_event(sai->sai_waitq,
sa_ready(entry) ||
thread_is_stopped(&sai->sai_thread),
&lwi);
rc = l_wait_event(sai->sai_waitq, sa_ready(entry), &lwi);
if (rc < 0) {
sa_put(sai, entry);
return -EAGAIN;
/*
* entry may not be ready, so it may be used by inflight
* statahead RPC, don't free it.
*/
entry = NULL;
rc = -EAGAIN;
goto out_unplug;
}
}
......@@ -1430,10 +1475,15 @@ static int revalidate_statahead_dentry(struct inode *dir,
alias = ll_splice_alias(inode, *dentryp);
if (IS_ERR(alias)) {
sa_put(sai, entry);
return PTR_ERR(alias);
rc = PTR_ERR(alias);
goto out_unplug;
}
*dentryp = alias;
/**
* statahead prepared this inode, transfer inode
* refcount from sa_entry to dentry
*/
entry->se_inode = NULL;
} else if ((*dentryp)->d_inode != inode) {
/* revalidate, but inode is recreated */
CDEBUG(D_READA,
......@@ -1445,10 +1495,7 @@ static int revalidate_statahead_dentry(struct inode *dir,
PFID(ll_inode2fid(inode)));
rc = -ESTALE;
goto out_unplug;
} else {
iput(inode);
}
entry->se_inode = NULL;
if ((bits & MDS_INODELOCK_LOOKUP) &&
d_lustre_invalid(*dentryp))
......@@ -1457,10 +1504,34 @@ static int revalidate_statahead_dentry(struct inode *dir,
}
}
out_unplug:
/*
* statahead cached sa_entry can be used only once, and will be killed
* right after use, so if lookup/revalidate accessed statahead cache,
* set dentry ldd_sa_generation to parent lli_sa_generation, later if we
* stat this file again, we know we've done statahead before, see
* dentry_may_statahead().
*/
ldd = ll_d2d(*dentryp);
lli = ll_i2info(dir);
/* ldd can be NULL if llite lookup failed. */
if (ldd)
ldd->lld_sa_generation = lli->lli_sa_generation;
sa_put(sai, entry);
return rc;
}
/**
* start statahead thread
*
* \param[in] dir parent directory
* \param[in] dentry dentry that triggers statahead, normally the first
* dirent under @dir
* \retval -EAGAIN on success, because when this function is
* called, it's already in lookup call, so client should
* do it itself instead of waiting for statahead thread
* to do it asynchronously.
* \retval negative number upon error
*/
static int start_statahead_thread(struct inode *dir, struct dentry *dentry)
{
struct ll_inode_info *lli = ll_i2info(dir);
......@@ -1468,60 +1539,34 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry)
struct l_wait_info lwi = { 0 };
struct ptlrpc_thread *thread;
struct task_struct *task;
struct dentry *parent;
struct dentry *parent = dentry->d_parent;
int rc;
/* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */
rc = is_first_dirent(dir, dentry);
if (rc == LS_NONE_FIRST_DE) {
if (rc == LS_NOT_FIRST_DE) {
/* It is not "ls -{a}l" operation, no need statahead for it. */
rc = -EAGAIN;
rc = -EFAULT;
goto out;
}
sai = ll_sai_alloc();
sai = ll_sai_alloc(parent);
if (!sai) {
rc = -ENOMEM;
goto out;
}
sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
sai->sai_inode = igrab(dir);
if (unlikely(!sai->sai_inode)) {
CWARN("Do not start stat ahead on dying inode "DFID"\n",
PFID(&lli->lli_fid));
rc = -ESTALE;
goto out;
}
/* get parent reference count here, and put it in ll_statahead_thread */
parent = dget(dentry->d_parent);
if (unlikely(sai->sai_inode != d_inode(parent))) {
struct ll_inode_info *nlli = ll_i2info(d_inode(parent));
CWARN("Race condition, someone changed %pd just now: old parent "DFID", new parent "DFID"\n",
dentry, PFID(&lli->lli_fid), PFID(&nlli->lli_fid));
dput(parent);
iput(sai->sai_inode);
rc = -EAGAIN;
goto out;
}
CDEBUG(D_READA, "start statahead thread: sai %p, parent %pd\n",
sai, parent);
/*
* if another process started statahead thread, or deauthorized current
* lli_opendir_key, don't start statahead.
* if current lli_opendir_key was deauthorized, or dir re-opened by
* another process, don't start statahead, otherwise the newly spawned
* statahead thread won't be notified to quit.
*/
spin_lock(&lli->lli_sa_lock);
if (unlikely(lli->lli_sai || lli->lli_opendir_key ||
lli->lli_opendir_pid != current->pid)) {
spin_unlock(&lli->lli_sa_lock);
dput(parent);
iput(sai->sai_inode);
rc = -EAGAIN;
rc = -EPERM;
goto out;
}
lli->lli_sai = sai;
......@@ -1529,22 +1574,16 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry)
atomic_inc(&ll_i2sbi(parent->d_inode)->ll_sa_running);
CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %pd]\n",
current_pid(), parent);
task = kthread_run(ll_statahead_thread, parent, "ll_sa_%u",
lli->lli_opendir_pid);
thread = &sai->sai_thread;
if (IS_ERR(task)) {
rc = PTR_ERR(task);
CERROR("cannot start ll_sa thread: rc = %d\n", rc);
dput(parent);
spin_lock(&lli->lli_sa_lock);
thread_set_flags(thread, SVC_STOPPED);
thread_set_flags(&sai->sai_agl_thread, SVC_STOPPED);
spin_unlock(&lli->lli_sa_lock);
ll_sai_put(sai);
LASSERT(!lli->lli_sai);
return -EAGAIN;
CERROR("can't start ll_sa thread, rc : %d\n", rc);
goto out;
}
l_wait_event(thread->t_ctl_waitq,
......@@ -1559,29 +1598,35 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry)
return -EAGAIN;
out:
kfree(sai);
/*
* once we start statahead thread failed, disable statahead so
* subsequent won't waste time to try it.
* that subsequent stat won't waste time to try it.
*/
spin_lock(&lli->lli_sa_lock);
lli->lli_sa_enabled = 0;
lli->lli_sai = NULL;
spin_unlock(&lli->lli_sa_lock);
if (sai)
ll_sai_free(sai);
return rc;
}
/**
* Start statahead thread if this is the first dir entry.
* Otherwise if a thread is started already, wait it until it is ahead of me.
* \retval 1 -- find entry with lock in cache, the caller needs to do
* nothing.
* \retval 0 -- find entry in cache, but without lock, the caller needs
* refresh from MDS.
* \retval others -- the caller need to process as non-statahead.
* statahead entry function, this is called when client getattr on a file, it
* will start statahead thread if this is the first dir entry, else revalidate
* dentry from statahead cache.
*
* \param[in] dir parent directory
* \param[out] dentryp dentry to getattr
* \param[in] unplug unplug statahead window only (normally for negative
* dentry)
* \retval 1 on success
* \retval 0 revalidation from statahead cache failed, caller needs
* to getattr from server directly
* \retval negative number on error, caller often ignores this and
* then getattr from server
*/
int do_statahead_enter(struct inode *dir, struct dentry **dentryp,
int only_unplug)
int ll_statahead(struct inode *dir, struct dentry **dentryp, bool unplug)
{
struct ll_statahead_info *sai;
......@@ -1589,13 +1634,11 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp,
if (sai) {
int rc;
rc = revalidate_statahead_dentry(dir, sai, dentryp,
only_unplug);
rc = revalidate_statahead_dentry(dir, sai, dentryp, unplug);
CDEBUG(D_READA, "revalidate statahead %pd: %d.\n",
*dentryp, rc);
ll_sai_put(sai);
return rc;
}
return start_statahead_thread(dir, *dentryp);
}
......@@ -1367,7 +1367,7 @@ static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data,
page = mdc_page_locate(mapping, &rp_param.rp_off, &start, &end,
rp_param.rp_hash64);
if (IS_ERR(page)) {
CERROR("%s: dir page locate: "DFID" at %llu: rc %ld\n",
CDEBUG(D_INFO, "%s: dir page locate: " DFID " at %llu: rc %ld\n",
exp->exp_obd->obd_name, PFID(&op_data->op_fid1),
rp_param.rp_off, PTR_ERR(page));
rc = PTR_ERR(page);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment