Commit 70db4f36 authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov

ceph: introduce a new inode flag indicating if cached dentries are ordered

After creating/deleting/renaming file, offsets of sibling dentries may
change. So we can not use cached dentries to satisfy readdir. But we can
still use the cached dentries to conclude -ENOENT for lookup.

This patch introduces a new inode flag indicating if child dentries are
ordered. The flag is set at the same time marking a directory complete.
After creating/deleting/renaming file, we clear the flag on directory
inode. This prevents ceph_readdir() from using cached dentries to satisfy
readdir syscall.
Signed-off-by: default avatarYan, Zheng <zyan@redhat.com>
parent 4965fc38
...@@ -183,7 +183,7 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx, ...@@ -183,7 +183,7 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx,
spin_unlock(&parent->d_lock); spin_unlock(&parent->d_lock);
/* make sure a dentry wasn't dropped while we didn't have parent lock */ /* make sure a dentry wasn't dropped while we didn't have parent lock */
if (!ceph_dir_is_complete(dir)) { if (!ceph_dir_is_complete_ordered(dir)) {
dout(" lost dir complete on %p; falling back to mds\n", dir); dout(" lost dir complete on %p; falling back to mds\n", dir);
dput(dentry); dput(dentry);
err = -EAGAIN; err = -EAGAIN;
...@@ -261,10 +261,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) ...@@ -261,10 +261,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
/* always start with . and .. */ /* always start with . and .. */
if (ctx->pos == 0) { if (ctx->pos == 0) {
/* note dir version at start of readdir so we can tell
* if any dentries get dropped */
fi->dir_release_count = atomic_read(&ci->i_release_count);
dout("readdir off 0 -> '.'\n"); dout("readdir off 0 -> '.'\n");
if (!dir_emit(ctx, ".", 1, if (!dir_emit(ctx, ".", 1,
ceph_translate_ino(inode->i_sb, inode->i_ino), ceph_translate_ino(inode->i_sb, inode->i_ino),
...@@ -289,7 +285,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) ...@@ -289,7 +285,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
if ((ctx->pos == 2 || fi->dentry) && if ((ctx->pos == 2 || fi->dentry) &&
!ceph_test_mount_opt(fsc, NOASYNCREADDIR) && !ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
ceph_snap(inode) != CEPH_SNAPDIR && ceph_snap(inode) != CEPH_SNAPDIR &&
__ceph_dir_is_complete(ci) && __ceph_dir_is_complete_ordered(ci) &&
__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) { __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
u32 shared_gen = ci->i_shared_gen; u32 shared_gen = ci->i_shared_gen;
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
...@@ -312,6 +308,13 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) ...@@ -312,6 +308,13 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
/* proceed with a normal readdir */ /* proceed with a normal readdir */
if (ctx->pos == 2) {
/* note dir version at start of readdir so we can tell
* if any dentries get dropped */
fi->dir_release_count = atomic_read(&ci->i_release_count);
fi->dir_ordered_count = ci->i_ordered_count;
}
more: more:
/* do we have the correct frag content buffered? */ /* do we have the correct frag content buffered? */
if (fi->frag != frag || fi->last_readdir == NULL) { if (fi->frag != frag || fi->last_readdir == NULL) {
...@@ -446,8 +449,12 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) ...@@ -446,8 +449,12 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
*/ */
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
if (atomic_read(&ci->i_release_count) == fi->dir_release_count) { if (atomic_read(&ci->i_release_count) == fi->dir_release_count) {
dout(" marking %p complete\n", inode); if (ci->i_ordered_count == fi->dir_ordered_count)
__ceph_dir_set_complete(ci, fi->dir_release_count); dout(" marking %p complete and ordered\n", inode);
else
dout(" marking %p complete\n", inode);
__ceph_dir_set_complete(ci, fi->dir_release_count,
fi->dir_ordered_count);
} }
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
......
...@@ -389,6 +389,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ...@@ -389,6 +389,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
ci->i_version = 0; ci->i_version = 0;
ci->i_time_warp_seq = 0; ci->i_time_warp_seq = 0;
ci->i_ceph_flags = 0; ci->i_ceph_flags = 0;
ci->i_ordered_count = 0;
atomic_set(&ci->i_release_count, 1); atomic_set(&ci->i_release_count, 1);
atomic_set(&ci->i_complete_count, 0); atomic_set(&ci->i_complete_count, 0);
ci->i_symlink = NULL; ci->i_symlink = NULL;
...@@ -845,7 +846,8 @@ static int fill_inode(struct inode *inode, ...@@ -845,7 +846,8 @@ static int fill_inode(struct inode *inode,
(issued & CEPH_CAP_FILE_EXCL) == 0 && (issued & CEPH_CAP_FILE_EXCL) == 0 &&
!__ceph_dir_is_complete(ci)) { !__ceph_dir_is_complete(ci)) {
dout(" marking %p complete (empty)\n", inode); dout(" marking %p complete (empty)\n", inode);
__ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count)); __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count),
ci->i_ordered_count);
} }
/* were we issued a capability? */ /* were we issued a capability? */
...@@ -1206,8 +1208,8 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, ...@@ -1206,8 +1208,8 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
ceph_invalidate_dentry_lease(dn); ceph_invalidate_dentry_lease(dn);
/* d_move screws up sibling dentries' offsets */ /* d_move screws up sibling dentries' offsets */
ceph_dir_clear_complete(dir); ceph_dir_clear_ordered(dir);
ceph_dir_clear_complete(olddir); ceph_dir_clear_ordered(olddir);
dout("dn %p gets new offset %lld\n", req->r_old_dentry, dout("dn %p gets new offset %lld\n", req->r_old_dentry,
ceph_dentry(req->r_old_dentry)->offset); ceph_dentry(req->r_old_dentry)->offset);
...@@ -1219,6 +1221,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, ...@@ -1219,6 +1221,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
if (!rinfo->head->is_target) { if (!rinfo->head->is_target) {
dout("fill_trace null dentry\n"); dout("fill_trace null dentry\n");
if (dn->d_inode) { if (dn->d_inode) {
ceph_dir_clear_ordered(dir);
dout("d_delete %p\n", dn); dout("d_delete %p\n", dn);
d_delete(dn); d_delete(dn);
} else { } else {
...@@ -1235,7 +1238,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, ...@@ -1235,7 +1238,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
/* attach proper inode */ /* attach proper inode */
if (!dn->d_inode) { if (!dn->d_inode) {
ceph_dir_clear_complete(dir); ceph_dir_clear_ordered(dir);
ihold(in); ihold(in);
dn = splice_dentry(dn, in, &have_lease); dn = splice_dentry(dn, in, &have_lease);
if (IS_ERR(dn)) { if (IS_ERR(dn)) {
...@@ -1265,7 +1268,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, ...@@ -1265,7 +1268,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
BUG_ON(!dir); BUG_ON(!dir);
BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR); BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR);
dout(" linking snapped dir %p to dn %p\n", in, dn); dout(" linking snapped dir %p to dn %p\n", in, dn);
ceph_dir_clear_complete(dir); ceph_dir_clear_ordered(dir);
ihold(in); ihold(in);
dn = splice_dentry(dn, in, NULL); dn = splice_dentry(dn, in, NULL);
if (IS_ERR(dn)) { if (IS_ERR(dn)) {
......
...@@ -256,6 +256,7 @@ struct ceph_inode_info { ...@@ -256,6 +256,7 @@ struct ceph_inode_info {
u32 i_time_warp_seq; u32 i_time_warp_seq;
unsigned i_ceph_flags; unsigned i_ceph_flags;
int i_ordered_count;
atomic_t i_release_count; atomic_t i_release_count;
atomic_t i_complete_count; atomic_t i_complete_count;
...@@ -434,14 +435,19 @@ static inline struct inode *ceph_find_inode(struct super_block *sb, ...@@ -434,14 +435,19 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
/* /*
* Ceph inode. * Ceph inode.
*/ */
#define CEPH_I_NODELAY 4 /* do not delay cap release */ #define CEPH_I_DIR_ORDERED 1 /* dentries in dir are ordered */
#define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */ #define CEPH_I_NODELAY 4 /* do not delay cap release */
#define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */ #define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */
#define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */
static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci, static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
int release_count) int release_count, int ordered_count)
{ {
atomic_set(&ci->i_complete_count, release_count); atomic_set(&ci->i_complete_count, release_count);
if (ci->i_ordered_count == ordered_count)
ci->i_ceph_flags |= CEPH_I_DIR_ORDERED;
else
ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
} }
static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci) static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci)
...@@ -455,16 +461,35 @@ static inline bool __ceph_dir_is_complete(struct ceph_inode_info *ci) ...@@ -455,16 +461,35 @@ static inline bool __ceph_dir_is_complete(struct ceph_inode_info *ci)
atomic_read(&ci->i_release_count); atomic_read(&ci->i_release_count);
} }
static inline bool __ceph_dir_is_complete_ordered(struct ceph_inode_info *ci)
{
return __ceph_dir_is_complete(ci) &&
(ci->i_ceph_flags & CEPH_I_DIR_ORDERED);
}
static inline void ceph_dir_clear_complete(struct inode *inode) static inline void ceph_dir_clear_complete(struct inode *inode)
{ {
__ceph_dir_clear_complete(ceph_inode(inode)); __ceph_dir_clear_complete(ceph_inode(inode));
} }
static inline bool ceph_dir_is_complete(struct inode *inode) static inline void ceph_dir_clear_ordered(struct inode *inode)
{ {
return __ceph_dir_is_complete(ceph_inode(inode)); struct ceph_inode_info *ci = ceph_inode(inode);
spin_lock(&ci->i_ceph_lock);
ci->i_ordered_count++;
ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
spin_unlock(&ci->i_ceph_lock);
} }
static inline bool ceph_dir_is_complete_ordered(struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
bool ret;
spin_lock(&ci->i_ceph_lock);
ret = __ceph_dir_is_complete_ordered(ci);
spin_unlock(&ci->i_ceph_lock);
return ret;
}
/* find a specific frag @f */ /* find a specific frag @f */
extern struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci, extern struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci,
...@@ -580,6 +605,7 @@ struct ceph_file_info { ...@@ -580,6 +605,7 @@ struct ceph_file_info {
char *last_name; /* last entry in previous chunk */ char *last_name; /* last entry in previous chunk */
struct dentry *dentry; /* next dentry (for dcache readdir) */ struct dentry *dentry; /* next dentry (for dcache readdir) */
int dir_release_count; int dir_release_count;
int dir_ordered_count;
/* used for -o dirstat read() on directory thing */ /* used for -o dirstat read() on directory thing */
char *dir_info; char *dir_info;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment