Commit f3c4ebe6 authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov

ceph: using hash value to compose dentry offset

If MDS sorts dentries in dirfrag in hash order, we use hash value to
compose dentry offset. dentry offset is:

  (0xff << 52) | ((24 bits hash) << 28) |
  (the nth entry hash hash collision)

This offset is stable across directory fragmentation. This alos means
there is no need to reset readdir offset if directory get fragmented
in the middle of readdir.
Signed-off-by: default avatarYan, Zheng <zyan@redhat.com>
parent 076c40f1
...@@ -69,16 +69,42 @@ int ceph_init_dentry(struct dentry *dentry) ...@@ -69,16 +69,42 @@ int ceph_init_dentry(struct dentry *dentry)
} }
/* /*
* for readdir, we encode the directory frag and offset within that * for f_pos for readdir:
* frag into f_pos. * - hash order:
* (0xff << 52) | ((24 bits hash) << 28) |
* (the nth entry has hash collision);
* - frag+name order;
* ((frag value) << 28) | (the nth entry in frag);
*/ */
#define OFFSET_BITS 28
#define OFFSET_MASK ((1 << OFFSET_BITS) - 1)
#define HASH_ORDER (0xffull << (OFFSET_BITS + 24))
loff_t ceph_make_fpos(unsigned high, unsigned off, bool hash_order)
{
loff_t fpos = ((loff_t)high << 28) | (loff_t)off;
if (hash_order)
fpos |= HASH_ORDER;
return fpos;
}
static bool is_hash_order(loff_t p)
{
return (p & HASH_ORDER) == HASH_ORDER;
}
static unsigned fpos_frag(loff_t p) static unsigned fpos_frag(loff_t p)
{ {
return p >> 32; return p >> OFFSET_BITS;
} }
static unsigned fpos_hash(loff_t p)
{
return ceph_frag_value(fpos_frag(p));
}
static unsigned fpos_off(loff_t p) static unsigned fpos_off(loff_t p)
{ {
return p & 0xffffffff; return p & OFFSET_MASK;
} }
static int fpos_cmp(loff_t l, loff_t r) static int fpos_cmp(loff_t l, loff_t r)
...@@ -177,7 +203,7 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx, ...@@ -177,7 +203,7 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx,
u64 idx = 0; u64 idx = 0;
int err = 0; int err = 0;
dout("__dcache_readdir %p v%u at %llu\n", dir, shared_gen, ctx->pos); dout("__dcache_readdir %p v%u at %llx\n", dir, shared_gen, ctx->pos);
/* search start position */ /* search start position */
if (ctx->pos > 2) { if (ctx->pos > 2) {
...@@ -234,7 +260,7 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx, ...@@ -234,7 +260,7 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx,
spin_unlock(&dentry->d_lock); spin_unlock(&dentry->d_lock);
if (emit_dentry) { if (emit_dentry) {
dout(" %llu (%llu) dentry %p %pd %p\n", di->offset, ctx->pos, dout(" %llx dentry %p %pd %p\n", di->offset,
dentry, dentry, d_inode(dentry)); dentry, dentry, d_inode(dentry));
ctx->pos = di->offset; ctx->pos = di->offset;
if (!dir_emit(ctx, dentry->d_name.name, if (!dir_emit(ctx, dentry->d_name.name,
...@@ -269,6 +295,16 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx, ...@@ -269,6 +295,16 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx,
return err; return err;
} }
static bool need_send_readdir(struct ceph_file_info *fi, loff_t pos)
{
if (!fi->last_readdir)
return true;
if (is_hash_order(pos))
return !ceph_frag_contains_value(fi->frag, fpos_hash(pos));
else
return fi->frag != fpos_frag(pos);
}
static int ceph_readdir(struct file *file, struct dir_context *ctx) static int ceph_readdir(struct file *file, struct dir_context *ctx)
{ {
struct ceph_file_info *fi = file->private_data; struct ceph_file_info *fi = file->private_data;
...@@ -276,7 +312,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) ...@@ -276,7 +312,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_mds_client *mdsc = fsc->mdsc; struct ceph_mds_client *mdsc = fsc->mdsc;
unsigned frag = fpos_frag(ctx->pos);
int i; int i;
int err; int err;
u32 ftype; u32 ftype;
...@@ -317,7 +352,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) ...@@ -317,7 +352,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
err = __dcache_readdir(file, ctx, shared_gen); err = __dcache_readdir(file, ctx, shared_gen);
if (err != -EAGAIN) if (err != -EAGAIN)
return err; return err;
frag = fpos_frag(ctx->pos);
} else { } else {
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
} }
...@@ -325,8 +359,9 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) ...@@ -325,8 +359,9 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
/* proceed with a normal readdir */ /* proceed with a normal readdir */
more: more:
/* do we have the correct frag content buffered? */ /* do we have the correct frag content buffered? */
if (fi->frag != frag || fi->last_readdir == NULL) { if (need_send_readdir(fi, ctx->pos)) {
struct ceph_mds_request *req; struct ceph_mds_request *req;
unsigned frag;
int op = ceph_snap(inode) == CEPH_SNAPDIR ? int op = ceph_snap(inode) == CEPH_SNAPDIR ?
CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR; CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR;
...@@ -336,6 +371,13 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) ...@@ -336,6 +371,13 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
fi->last_readdir = NULL; fi->last_readdir = NULL;
} }
if (is_hash_order(ctx->pos)) {
frag = ceph_choose_frag(ci, fpos_hash(ctx->pos),
NULL, NULL);
} else {
frag = fpos_frag(ctx->pos);
}
dout("readdir fetching %llx.%llx frag %x offset '%s'\n", dout("readdir fetching %llx.%llx frag %x offset '%s'\n",
ceph_vinop(inode), frag, fi->last_name); ceph_vinop(inode), frag, fi->last_name);
req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
...@@ -373,19 +415,23 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) ...@@ -373,19 +415,23 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
ceph_mdsc_put_request(req); ceph_mdsc_put_request(req);
return err; return err;
} }
dout("readdir got and parsed readdir result=%d" dout("readdir got and parsed readdir result=%d on "
" on frag %x, end=%d, complete=%d\n", err, frag, "frag %x, end=%d, complete=%d, hash_order=%d\n",
err, frag,
(int)req->r_reply_info.dir_end, (int)req->r_reply_info.dir_end,
(int)req->r_reply_info.dir_complete); (int)req->r_reply_info.dir_complete,
(int)req->r_reply_info.hash_order);
/* note next offset and last dentry name */
rinfo = &req->r_reply_info; rinfo = &req->r_reply_info;
if (le32_to_cpu(rinfo->dir_dir->frag) != frag) { if (le32_to_cpu(rinfo->dir_dir->frag) != frag) {
frag = le32_to_cpu(rinfo->dir_dir->frag); frag = le32_to_cpu(rinfo->dir_dir->frag);
fi->next_offset = req->r_readdir_offset; if (!rinfo->hash_order) {
/* adjust ctx->pos to beginning of frag */ fi->next_offset = req->r_readdir_offset;
ctx->pos = ceph_make_fpos(frag, fi->next_offset); /* adjust ctx->pos to beginning of frag */
ctx->pos = ceph_make_fpos(frag,
fi->next_offset,
false);
}
} }
fi->frag = frag; fi->frag = frag;
...@@ -411,23 +457,25 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) ...@@ -411,23 +457,25 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
fi->dir_release_count = 0; fi->dir_release_count = 0;
} }
if (req->r_reply_info.dir_end) { /* note next offset and last dentry name */
kfree(fi->last_name); if (rinfo->dir_nr > 0) {
fi->last_name = NULL;
fi->next_offset = 2;
} else {
struct ceph_mds_reply_dir_entry *rde = struct ceph_mds_reply_dir_entry *rde =
rinfo->dir_entries + (rinfo->dir_nr-1); rinfo->dir_entries + (rinfo->dir_nr-1);
unsigned next_offset = req->r_reply_info.dir_end ?
2 : (fpos_off(rde->offset) + 1);
err = note_last_dentry(fi, rde->name, rde->name_len, err = note_last_dentry(fi, rde->name, rde->name_len,
fpos_off(rde->offset) + 1); next_offset);
if (err) if (err)
return err; return err;
} else if (req->r_reply_info.dir_end) {
fi->next_offset = 2;
/* keep last name */
} }
} }
rinfo = &fi->last_readdir->r_reply_info; rinfo = &fi->last_readdir->r_reply_info;
dout("readdir frag %x num %d pos %llx chunk first %llx\n", dout("readdir frag %x num %d pos %llx chunk first %llx\n",
frag, rinfo->dir_nr, ctx->pos, fi->frag, rinfo->dir_nr, ctx->pos,
rinfo->dir_nr ? rinfo->dir_entries[0].offset : 0LL); rinfo->dir_nr ? rinfo->dir_entries[0].offset : 0LL);
i = 0; i = 0;
...@@ -470,16 +518,26 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) ...@@ -470,16 +518,26 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
ctx->pos++; ctx->pos++;
} }
if (fi->last_name) { if (fi->next_offset > 2) {
ceph_mdsc_put_request(fi->last_readdir); ceph_mdsc_put_request(fi->last_readdir);
fi->last_readdir = NULL; fi->last_readdir = NULL;
goto more; goto more;
} }
/* more frags? */ /* more frags? */
if (!ceph_frag_is_rightmost(frag)) { if (!ceph_frag_is_rightmost(fi->frag)) {
frag = ceph_frag_next(frag); unsigned frag = ceph_frag_next(fi->frag);
ctx->pos = ceph_make_fpos(frag, 2); if (is_hash_order(ctx->pos)) {
loff_t new_pos = ceph_make_fpos(ceph_frag_value(frag),
fi->next_offset, true);
if (new_pos > ctx->pos)
ctx->pos = new_pos;
/* keep last_name */
} else {
ctx->pos = ceph_make_fpos(frag, fi->next_offset, false);
kfree(fi->last_name);
fi->last_name = NULL;
}
dout("readdir next frag is %x\n", frag); dout("readdir next frag is %x\n", frag);
goto more; goto more;
} }
...@@ -532,14 +590,21 @@ static void reset_readdir(struct ceph_file_info *fi) ...@@ -532,14 +590,21 @@ static void reset_readdir(struct ceph_file_info *fi)
static bool need_reset_readdir(struct ceph_file_info *fi, loff_t new_pos) static bool need_reset_readdir(struct ceph_file_info *fi, loff_t new_pos)
{ {
struct ceph_mds_reply_info_parsed *rinfo; struct ceph_mds_reply_info_parsed *rinfo;
loff_t chunk_offset;
if (new_pos == 0) if (new_pos == 0)
return true; return true;
if (fpos_frag(new_pos) != fi->frag) if (is_hash_order(new_pos)) {
/* no need to reset last_name for a forward seek when
* dentries are sotred in hash order */
} else if (fi->frag |= fpos_frag(new_pos)) {
return true; return true;
}
rinfo = fi->last_readdir ? &fi->last_readdir->r_reply_info : NULL; rinfo = fi->last_readdir ? &fi->last_readdir->r_reply_info : NULL;
if (!rinfo || !rinfo->dir_nr) if (!rinfo || !rinfo->dir_nr)
return true; return true;
return new_pos < rinfo->dir_entries[0].offset;; chunk_offset = rinfo->dir_entries[0].offset;
return new_pos < chunk_offset ||
is_hash_order(new_pos) != is_hash_order(chunk_offset);
} }
static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence) static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
...@@ -562,17 +627,22 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence) ...@@ -562,17 +627,22 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
} }
if (offset >= 0) { if (offset >= 0) {
if (need_reset_readdir(fi, offset)) {
dout("dir_llseek dropping %p content\n", file);
reset_readdir(fi);
} else if (is_hash_order(offset) && offset > file->f_pos) {
/* for hash offset, we don't know if a forward seek
* is within same frag */
fi->dir_release_count = 0;
fi->readdir_cache_idx = -1;
}
if (offset != file->f_pos) { if (offset != file->f_pos) {
file->f_pos = offset; file->f_pos = offset;
file->f_version = 0; file->f_version = 0;
fi->flags &= ~CEPH_F_ATEND; fi->flags &= ~CEPH_F_ATEND;
} }
retval = offset; retval = offset;
if (need_reset_readdir(fi, offset)) {
dout("dir_llseek dropping %p content\n", file);
reset_readdir(fi);
}
} }
out: out:
inode_unlock(inode); inode_unlock(inode);
......
...@@ -1387,6 +1387,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, ...@@ -1387,6 +1387,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
struct ceph_mds_session *session) struct ceph_mds_session *session)
{ {
struct dentry *parent = req->r_dentry; struct dentry *parent = req->r_dentry;
struct ceph_inode_info *ci = ceph_inode(d_inode(parent));
struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
struct qstr dname; struct qstr dname;
struct dentry *dn; struct dentry *dn;
...@@ -1394,19 +1395,27 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, ...@@ -1394,19 +1395,27 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
int err = 0, skipped = 0, ret, i; int err = 0, skipped = 0, ret, i;
struct inode *snapdir = NULL; struct inode *snapdir = NULL;
struct ceph_mds_request_head *rhead = req->r_request->front.iov_base; struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
struct ceph_dentry_info *di;
u32 frag = le32_to_cpu(rhead->args.readdir.frag); u32 frag = le32_to_cpu(rhead->args.readdir.frag);
u32 last_hash = 0;
u32 fpos_offset;
struct ceph_readdir_cache_control cache_ctl = {}; struct ceph_readdir_cache_control cache_ctl = {};
if (req->r_aborted) if (req->r_aborted)
return readdir_prepopulate_inodes_only(req, session); return readdir_prepopulate_inodes_only(req, session);
if (rinfo->hash_order && req->r_path2) {
last_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
req->r_path2, strlen(req->r_path2));
last_hash = ceph_frag_value(last_hash);
}
if (rinfo->dir_dir && if (rinfo->dir_dir &&
le32_to_cpu(rinfo->dir_dir->frag) != frag) { le32_to_cpu(rinfo->dir_dir->frag) != frag) {
dout("readdir_prepopulate got new frag %x -> %x\n", dout("readdir_prepopulate got new frag %x -> %x\n",
frag, le32_to_cpu(rinfo->dir_dir->frag)); frag, le32_to_cpu(rinfo->dir_dir->frag));
frag = le32_to_cpu(rinfo->dir_dir->frag); frag = le32_to_cpu(rinfo->dir_dir->frag);
req->r_readdir_offset = 2; if (!rinfo->hash_order)
req->r_readdir_offset = 2;
} }
if (le32_to_cpu(rinfo->head->op) == CEPH_MDS_OP_LSSNAP) { if (le32_to_cpu(rinfo->head->op) == CEPH_MDS_OP_LSSNAP) {
...@@ -1424,13 +1433,13 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, ...@@ -1424,13 +1433,13 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
if (ceph_frag_is_leftmost(frag) && req->r_readdir_offset == 2) { if (ceph_frag_is_leftmost(frag) && req->r_readdir_offset == 2) {
/* note dir version at start of readdir so we can tell /* note dir version at start of readdir so we can tell
* if any dentries get dropped */ * if any dentries get dropped */
struct ceph_inode_info *ci = ceph_inode(d_inode(parent));
req->r_dir_release_cnt = atomic64_read(&ci->i_release_count); req->r_dir_release_cnt = atomic64_read(&ci->i_release_count);
req->r_dir_ordered_cnt = atomic64_read(&ci->i_ordered_count); req->r_dir_ordered_cnt = atomic64_read(&ci->i_ordered_count);
req->r_readdir_cache_idx = 0; req->r_readdir_cache_idx = 0;
} }
cache_ctl.index = req->r_readdir_cache_idx; cache_ctl.index = req->r_readdir_cache_idx;
fpos_offset = req->r_readdir_offset;
/* FIXME: release caps/leases if error occurs */ /* FIXME: release caps/leases if error occurs */
for (i = 0; i < rinfo->dir_nr; i++) { for (i = 0; i < rinfo->dir_nr; i++) {
...@@ -1444,6 +1453,18 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, ...@@ -1444,6 +1453,18 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
vino.ino = le64_to_cpu(rde->inode.in->ino); vino.ino = le64_to_cpu(rde->inode.in->ino);
vino.snap = le64_to_cpu(rde->inode.in->snapid); vino.snap = le64_to_cpu(rde->inode.in->snapid);
if (rinfo->hash_order) {
u32 hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
rde->name, rde->name_len);
hash = ceph_frag_value(hash);
if (hash != last_hash)
fpos_offset = 2;
last_hash = hash;
rde->offset = ceph_make_fpos(hash, fpos_offset++, true);
} else {
rde->offset = ceph_make_fpos(frag, fpos_offset++, false);
}
retry_lookup: retry_lookup:
dn = d_lookup(parent, &dname); dn = d_lookup(parent, &dname);
dout("d_lookup on parent=%p name=%.*s got %p\n", dout("d_lookup on parent=%p name=%.*s got %p\n",
...@@ -1521,9 +1542,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, ...@@ -1521,9 +1542,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
dn = realdn; dn = realdn;
} }
di = dn->d_fsdata; ceph_dentry(dn)->offset = rde->offset;
di->offset = ceph_make_fpos(frag, i + req->r_readdir_offset);
rde->offset = di->offset;
update_dentry_lease(dn, rde->lease, req->r_session, update_dentry_lease(dn, rde->lease, req->r_session,
req->r_request_started); req->r_request_started);
......
...@@ -185,6 +185,7 @@ static int parse_reply_info_dir(void **p, void *end, ...@@ -185,6 +185,7 @@ static int parse_reply_info_dir(void **p, void *end,
u16 flags = ceph_decode_16(p); u16 flags = ceph_decode_16(p);
info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
} }
if (num == 0) if (num == 0)
goto done; goto done;
......
...@@ -81,7 +81,9 @@ struct ceph_mds_reply_info_parsed { ...@@ -81,7 +81,9 @@ struct ceph_mds_reply_info_parsed {
struct ceph_mds_reply_dirfrag *dir_dir; struct ceph_mds_reply_dirfrag *dir_dir;
size_t dir_buf_size; size_t dir_buf_size;
int dir_nr; int dir_nr;
bool dir_complete, dir_end; bool dir_complete;
bool dir_end;
bool hash_order;
struct ceph_mds_reply_dir_entry *dir_entries; struct ceph_mds_reply_dir_entry *dir_entries;
}; };
......
...@@ -540,11 +540,6 @@ static inline struct ceph_dentry_info *ceph_dentry(struct dentry *dentry) ...@@ -540,11 +540,6 @@ static inline struct ceph_dentry_info *ceph_dentry(struct dentry *dentry)
return (struct ceph_dentry_info *)dentry->d_fsdata; return (struct ceph_dentry_info *)dentry->d_fsdata;
} }
static inline loff_t ceph_make_fpos(unsigned frag, unsigned off)
{
return ((loff_t)frag << 32) | (loff_t)off;
}
/* /*
* caps helpers * caps helpers
*/ */
...@@ -949,6 +944,7 @@ extern const struct inode_operations ceph_snapdir_iops; ...@@ -949,6 +944,7 @@ extern const struct inode_operations ceph_snapdir_iops;
extern const struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops, extern const struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops,
ceph_snapdir_dentry_ops; ceph_snapdir_dentry_ops;
extern loff_t ceph_make_fpos(unsigned high, unsigned off, bool hash_order);
extern int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry); extern int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry);
extern int ceph_handle_snapdir(struct ceph_mds_request *req, extern int ceph_handle_snapdir(struct ceph_mds_request *req,
struct dentry *dentry, int err); struct dentry *dentry, int err);
......
...@@ -357,6 +357,7 @@ extern const char *ceph_mds_op_name(int op); ...@@ -357,6 +357,7 @@ extern const char *ceph_mds_op_name(int op);
*/ */
#define CEPH_READDIR_FRAG_END (1<<0) #define CEPH_READDIR_FRAG_END (1<<0)
#define CEPH_READDIR_FRAG_COMPLETE (1<<8) #define CEPH_READDIR_FRAG_COMPLETE (1<<8)
#define CEPH_READDIR_HASH_ORDER (1<<9)
union ceph_mds_request_args { union ceph_mds_request_args {
struct { struct {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment