Commit 779fe0fb authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov

ceph: rados pool namespace support

This patch adds codes that decode pool namespace information in
cap message and request reply. Pool namespace is saved in i_layout,
it will be passed to libceph when doing read/write.
Signed-off-by: default avatarYan, Zheng <zyan@redhat.com>
parent cd08e0a2
...@@ -1730,7 +1730,8 @@ enum { ...@@ -1730,7 +1730,8 @@ enum {
POOL_WRITE = 2, POOL_WRITE = 2,
}; };
static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool) static int __ceph_pool_perm_get(struct ceph_inode_info *ci,
s64 pool, struct ceph_string *pool_ns)
{ {
struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode); struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode);
struct ceph_mds_client *mdsc = fsc->mdsc; struct ceph_mds_client *mdsc = fsc->mdsc;
...@@ -1738,6 +1739,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool) ...@@ -1738,6 +1739,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool)
struct rb_node **p, *parent; struct rb_node **p, *parent;
struct ceph_pool_perm *perm; struct ceph_pool_perm *perm;
struct page **pages; struct page **pages;
size_t pool_ns_len;
int err = 0, err2 = 0, have = 0; int err = 0, err2 = 0, have = 0;
down_read(&mdsc->pool_perm_rwsem); down_read(&mdsc->pool_perm_rwsem);
...@@ -1748,18 +1750,32 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool) ...@@ -1748,18 +1750,32 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool)
p = &(*p)->rb_left; p = &(*p)->rb_left;
else if (pool > perm->pool) else if (pool > perm->pool)
p = &(*p)->rb_right; p = &(*p)->rb_right;
else {
int ret = ceph_compare_string(pool_ns,
perm->pool_ns,
perm->pool_ns_len);
if (ret < 0)
p = &(*p)->rb_left;
else if (ret > 0)
p = &(*p)->rb_right;
else { else {
have = perm->perm; have = perm->perm;
break; break;
} }
} }
}
up_read(&mdsc->pool_perm_rwsem); up_read(&mdsc->pool_perm_rwsem);
if (*p) if (*p)
goto out; goto out;
if (pool_ns)
dout("__ceph_pool_perm_get pool %lld ns %.*s no perm cached\n",
pool, (int)pool_ns->len, pool_ns->str);
else
dout("__ceph_pool_perm_get pool %lld no perm cached\n", pool); dout("__ceph_pool_perm_get pool %lld no perm cached\n", pool);
down_write(&mdsc->pool_perm_rwsem); down_write(&mdsc->pool_perm_rwsem);
p = &mdsc->pool_perm_tree.rb_node;
parent = NULL; parent = NULL;
while (*p) { while (*p) {
parent = *p; parent = *p;
...@@ -1768,11 +1784,20 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool) ...@@ -1768,11 +1784,20 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool)
p = &(*p)->rb_left; p = &(*p)->rb_left;
else if (pool > perm->pool) else if (pool > perm->pool)
p = &(*p)->rb_right; p = &(*p)->rb_right;
else {
int ret = ceph_compare_string(pool_ns,
perm->pool_ns,
perm->pool_ns_len);
if (ret < 0)
p = &(*p)->rb_left;
else if (ret > 0)
p = &(*p)->rb_right;
else { else {
have = perm->perm; have = perm->perm;
break; break;
} }
} }
}
if (*p) { if (*p) {
up_write(&mdsc->pool_perm_rwsem); up_write(&mdsc->pool_perm_rwsem);
goto out; goto out;
...@@ -1788,6 +1813,8 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool) ...@@ -1788,6 +1813,8 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool)
rd_req->r_flags = CEPH_OSD_FLAG_READ; rd_req->r_flags = CEPH_OSD_FLAG_READ;
osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0); osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0);
rd_req->r_base_oloc.pool = pool; rd_req->r_base_oloc.pool = pool;
if (pool_ns)
rd_req->r_base_oloc.pool_ns = ceph_get_string(pool_ns);
ceph_oid_printf(&rd_req->r_base_oid, "%llx.00000000", ci->i_vino.ino); ceph_oid_printf(&rd_req->r_base_oid, "%llx.00000000", ci->i_vino.ino);
err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS); err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS);
...@@ -1841,7 +1868,8 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool) ...@@ -1841,7 +1868,8 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool)
goto out_unlock; goto out_unlock;
} }
perm = kmalloc(sizeof(*perm), GFP_NOFS); pool_ns_len = pool_ns ? pool_ns->len : 0;
perm = kmalloc(sizeof(*perm) + pool_ns_len + 1, GFP_NOFS);
if (!perm) { if (!perm) {
err = -ENOMEM; err = -ENOMEM;
goto out_unlock; goto out_unlock;
...@@ -1849,6 +1877,11 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool) ...@@ -1849,6 +1877,11 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool)
perm->pool = pool; perm->pool = pool;
perm->perm = have; perm->perm = have;
perm->pool_ns_len = pool_ns_len;
if (pool_ns_len > 0)
memcpy(perm->pool_ns, pool_ns->str, pool_ns_len);
perm->pool_ns[pool_ns_len] = 0;
rb_link_node(&perm->node, parent, p); rb_link_node(&perm->node, parent, p);
rb_insert_color(&perm->node, &mdsc->pool_perm_tree); rb_insert_color(&perm->node, &mdsc->pool_perm_tree);
err = 0; err = 0;
...@@ -1860,6 +1893,10 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool) ...@@ -1860,6 +1893,10 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool)
out: out:
if (!err) if (!err)
err = have; err = have;
if (pool_ns)
dout("__ceph_pool_perm_get pool %lld ns %.*s result = %d\n",
pool, (int)pool_ns->len, pool_ns->str, err);
else
dout("__ceph_pool_perm_get pool %lld result = %d\n", pool, err); dout("__ceph_pool_perm_get pool %lld result = %d\n", pool, err);
return err; return err;
} }
...@@ -1867,12 +1904,9 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool) ...@@ -1867,12 +1904,9 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool)
int ceph_pool_perm_check(struct ceph_inode_info *ci, int need) int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
{ {
s64 pool; s64 pool;
struct ceph_string *pool_ns;
int ret, flags; int ret, flags;
/* does not support pool namespace yet */
if (ci->i_pool_ns_len)
return -EIO;
if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode), if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode),
NOPOOLPERM)) NOPOOLPERM))
return 0; return 0;
...@@ -1896,7 +1930,9 @@ int ceph_pool_perm_check(struct ceph_inode_info *ci, int need) ...@@ -1896,7 +1930,9 @@ int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
return 0; return 0;
} }
ret = __ceph_pool_perm_get(ci, pool); pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
ret = __ceph_pool_perm_get(ci, pool, pool_ns);
ceph_put_string(pool_ns);
if (ret < 0) if (ret < 0)
return ret; return ret;
...@@ -1907,8 +1943,9 @@ int ceph_pool_perm_check(struct ceph_inode_info *ci, int need) ...@@ -1907,8 +1943,9 @@ int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
flags |= CEPH_I_POOL_WR; flags |= CEPH_I_POOL_WR;
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
if (pool == ci->i_layout.pool_id) { if (pool == ci->i_layout.pool_id &&
ci->i_ceph_flags = flags; pool_ns == rcu_dereference_raw(ci->i_layout.pool_ns)) {
ci->i_ceph_flags |= flags;
} else { } else {
pool = ci->i_layout.pool_id; pool = ci->i_layout.pool_id;
flags = ci->i_ceph_flags; flags = ci->i_ceph_flags;
......
...@@ -2779,12 +2779,11 @@ static void invalidate_aliases(struct inode *inode) ...@@ -2779,12 +2779,11 @@ static void invalidate_aliases(struct inode *inode)
*/ */
static void handle_cap_grant(struct ceph_mds_client *mdsc, static void handle_cap_grant(struct ceph_mds_client *mdsc,
struct inode *inode, struct ceph_mds_caps *grant, struct inode *inode, struct ceph_mds_caps *grant,
u64 inline_version, struct ceph_string **pns, u64 inline_version,
void *inline_data, int inline_len, void *inline_data, u32 inline_len,
struct ceph_buffer *xattr_buf, struct ceph_buffer *xattr_buf,
struct ceph_mds_session *session, struct ceph_mds_session *session,
struct ceph_cap *cap, int issued, struct ceph_cap *cap, int issued)
u32 pool_ns_len)
__releases(ci->i_ceph_lock) __releases(ci->i_ceph_lock)
__releases(mdsc->snap_rwsem) __releases(mdsc->snap_rwsem)
{ {
...@@ -2896,11 +2895,18 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, ...@@ -2896,11 +2895,18 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) { if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) {
/* file layout may have changed */ /* file layout may have changed */
s64 old_pool = ci->i_layout.pool_id; s64 old_pool = ci->i_layout.pool_id;
struct ceph_string *old_ns;
ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout); ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout);
ci->i_pool_ns_len = pool_ns_len; old_ns = rcu_dereference_protected(ci->i_layout.pool_ns,
if (ci->i_layout.pool_id != old_pool) lockdep_is_held(&ci->i_ceph_lock));
rcu_assign_pointer(ci->i_layout.pool_ns, *pns);
if (ci->i_layout.pool_id != old_pool || *pns != old_ns)
ci->i_ceph_flags &= ~CEPH_I_POOL_PERM; ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;
*pns = old_ns;
/* size/truncate_seq? */ /* size/truncate_seq? */
queue_trunc = ceph_fill_file_size(inode, issued, queue_trunc = ceph_fill_file_size(inode, issued,
le32_to_cpu(grant->truncate_seq), le32_to_cpu(grant->truncate_seq),
...@@ -3423,20 +3429,18 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -3423,20 +3429,18 @@ void ceph_handle_caps(struct ceph_mds_session *session,
struct ceph_cap *cap; struct ceph_cap *cap;
struct ceph_mds_caps *h; struct ceph_mds_caps *h;
struct ceph_mds_cap_peer *peer = NULL; struct ceph_mds_cap_peer *peer = NULL;
struct ceph_snap_realm *realm; struct ceph_snap_realm *realm = NULL;
struct ceph_string *pool_ns = NULL;
int mds = session->s_mds; int mds = session->s_mds;
int op, issued; int op, issued;
u32 seq, mseq; u32 seq, mseq;
struct ceph_vino vino; struct ceph_vino vino;
u64 cap_id;
u64 size, max_size;
u64 tid; u64 tid;
u64 inline_version = 0; u64 inline_version = 0;
void *inline_data = NULL; void *inline_data = NULL;
u32 inline_len = 0; u32 inline_len = 0;
void *snaptrace; void *snaptrace;
size_t snaptrace_len; size_t snaptrace_len;
u32 pool_ns_len = 0;
void *p, *end; void *p, *end;
dout("handle_caps from mds%d\n", mds); dout("handle_caps from mds%d\n", mds);
...@@ -3450,11 +3454,8 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -3450,11 +3454,8 @@ void ceph_handle_caps(struct ceph_mds_session *session,
op = le32_to_cpu(h->op); op = le32_to_cpu(h->op);
vino.ino = le64_to_cpu(h->ino); vino.ino = le64_to_cpu(h->ino);
vino.snap = CEPH_NOSNAP; vino.snap = CEPH_NOSNAP;
cap_id = le64_to_cpu(h->cap_id);
seq = le32_to_cpu(h->seq); seq = le32_to_cpu(h->seq);
mseq = le32_to_cpu(h->migrate_seq); mseq = le32_to_cpu(h->migrate_seq);
size = le64_to_cpu(h->size);
max_size = le64_to_cpu(h->max_size);
snaptrace = h + 1; snaptrace = h + 1;
snaptrace_len = le32_to_cpu(h->snap_trace_len); snaptrace_len = le32_to_cpu(h->snap_trace_len);
...@@ -3493,6 +3494,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -3493,6 +3494,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
u64 flush_tid; u64 flush_tid;
u32 caller_uid, caller_gid; u32 caller_uid, caller_gid;
u32 osd_epoch_barrier; u32 osd_epoch_barrier;
u32 pool_ns_len;
/* version >= 5 */ /* version >= 5 */
ceph_decode_32_safe(&p, end, osd_epoch_barrier, bad); ceph_decode_32_safe(&p, end, osd_epoch_barrier, bad);
/* version >= 6 */ /* version >= 6 */
...@@ -3502,6 +3504,11 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -3502,6 +3504,11 @@ void ceph_handle_caps(struct ceph_mds_session *session,
ceph_decode_32_safe(&p, end, caller_gid, bad); ceph_decode_32_safe(&p, end, caller_gid, bad);
/* version >= 8 */ /* version >= 8 */
ceph_decode_32_safe(&p, end, pool_ns_len, bad); ceph_decode_32_safe(&p, end, pool_ns_len, bad);
if (pool_ns_len > 0) {
ceph_decode_need(&p, end, pool_ns_len, bad);
pool_ns = ceph_find_or_create_string(p, pool_ns_len);
p += pool_ns_len;
}
} }
/* lookup ino */ /* lookup ino */
...@@ -3522,7 +3529,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -3522,7 +3529,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
cap = ceph_get_cap(mdsc, NULL); cap = ceph_get_cap(mdsc, NULL);
cap->cap_ino = vino.ino; cap->cap_ino = vino.ino;
cap->queue_release = 1; cap->queue_release = 1;
cap->cap_id = cap_id; cap->cap_id = le64_to_cpu(h->cap_id);
cap->mseq = mseq; cap->mseq = mseq;
cap->seq = seq; cap->seq = seq;
spin_lock(&session->s_cap_lock); spin_lock(&session->s_cap_lock);
...@@ -3557,10 +3564,9 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -3557,10 +3564,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
} }
handle_cap_import(mdsc, inode, h, peer, session, handle_cap_import(mdsc, inode, h, peer, session,
&cap, &issued); &cap, &issued);
handle_cap_grant(mdsc, inode, h, handle_cap_grant(mdsc, inode, h, &pool_ns,
inline_version, inline_data, inline_len, inline_version, inline_data, inline_len,
msg->middle, session, cap, issued, msg->middle, session, cap, issued);
pool_ns_len);
if (realm) if (realm)
ceph_put_snap_realm(mdsc, realm); ceph_put_snap_realm(mdsc, realm);
goto done_unlocked; goto done_unlocked;
...@@ -3582,10 +3588,9 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -3582,10 +3588,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
case CEPH_CAP_OP_GRANT: case CEPH_CAP_OP_GRANT:
__ceph_caps_issued(ci, &issued); __ceph_caps_issued(ci, &issued);
issued |= __ceph_caps_dirty(ci); issued |= __ceph_caps_dirty(ci);
handle_cap_grant(mdsc, inode, h, handle_cap_grant(mdsc, inode, h, &pool_ns,
inline_version, inline_data, inline_len, inline_version, inline_data, inline_len,
msg->middle, session, cap, issued, msg->middle, session, cap, issued);
pool_ns_len);
goto done_unlocked; goto done_unlocked;
case CEPH_CAP_OP_FLUSH_ACK: case CEPH_CAP_OP_FLUSH_ACK:
...@@ -3616,6 +3621,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -3616,6 +3621,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
mutex_unlock(&session->s_mutex); mutex_unlock(&session->s_mutex);
done_unlocked: done_unlocked:
iput(inode); iput(inode);
ceph_put_string(pool_ns);
return; return;
bad: bad:
......
...@@ -447,7 +447,6 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ...@@ -447,7 +447,6 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout)); memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout));
RCU_INIT_POINTER(ci->i_layout.pool_ns, NULL); RCU_INIT_POINTER(ci->i_layout.pool_ns, NULL);
ci->i_pool_ns_len = 0;
ci->i_fragtree = RB_ROOT; ci->i_fragtree = RB_ROOT;
mutex_init(&ci->i_fragtree_mutex); mutex_init(&ci->i_fragtree_mutex);
...@@ -571,7 +570,7 @@ void ceph_destroy_inode(struct inode *inode) ...@@ -571,7 +570,7 @@ void ceph_destroy_inode(struct inode *inode)
if (ci->i_xattrs.prealloc_blob) if (ci->i_xattrs.prealloc_blob)
ceph_buffer_put(ci->i_xattrs.prealloc_blob); ceph_buffer_put(ci->i_xattrs.prealloc_blob);
ceph_put_string(ci->i_layout.pool_ns); ceph_put_string(rcu_dereference_raw(ci->i_layout.pool_ns));
call_rcu(&inode->i_rcu, ceph_i_callback); call_rcu(&inode->i_rcu, ceph_i_callback);
} }
...@@ -736,6 +735,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page, ...@@ -736,6 +735,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
int issued = 0, implemented, new_issued; int issued = 0, implemented, new_issued;
struct timespec mtime, atime, ctime; struct timespec mtime, atime, ctime;
struct ceph_buffer *xattr_blob = NULL; struct ceph_buffer *xattr_blob = NULL;
struct ceph_string *pool_ns = NULL;
struct ceph_cap *new_cap = NULL; struct ceph_cap *new_cap = NULL;
int err = 0; int err = 0;
bool wake = false; bool wake = false;
...@@ -763,6 +763,10 @@ static int fill_inode(struct inode *inode, struct page *locked_page, ...@@ -763,6 +763,10 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
iinfo->xattr_len); iinfo->xattr_len);
} }
if (iinfo->pool_ns_len > 0)
pool_ns = ceph_find_or_create_string(iinfo->pool_ns_data,
iinfo->pool_ns_len);
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
/* /*
...@@ -818,11 +822,18 @@ static int fill_inode(struct inode *inode, struct page *locked_page, ...@@ -818,11 +822,18 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
if (new_version || if (new_version ||
(new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) { (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
s64 old_pool = ci->i_layout.pool_id; s64 old_pool = ci->i_layout.pool_id;
struct ceph_string *old_ns;
ceph_file_layout_from_legacy(&ci->i_layout, &info->layout); ceph_file_layout_from_legacy(&ci->i_layout, &info->layout);
ci->i_pool_ns_len = iinfo->pool_ns_len; old_ns = rcu_dereference_protected(ci->i_layout.pool_ns,
if (ci->i_layout.pool_id != old_pool) lockdep_is_held(&ci->i_ceph_lock));
rcu_assign_pointer(ci->i_layout.pool_ns, pool_ns);
if (ci->i_layout.pool_id != old_pool || pool_ns != old_ns)
ci->i_ceph_flags &= ~CEPH_I_POOL_PERM; ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;
pool_ns = old_ns;
queue_trunc = ceph_fill_file_size(inode, issued, queue_trunc = ceph_fill_file_size(inode, issued,
le32_to_cpu(info->truncate_seq), le32_to_cpu(info->truncate_seq),
le64_to_cpu(info->truncate_size), le64_to_cpu(info->truncate_size),
...@@ -989,6 +1000,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page, ...@@ -989,6 +1000,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
ceph_put_cap(mdsc, new_cap); ceph_put_cap(mdsc, new_cap);
if (xattr_blob) if (xattr_blob)
ceph_buffer_put(xattr_blob); ceph_buffer_put(xattr_blob);
ceph_put_string(pool_ns);
return err; return err;
} }
......
...@@ -213,9 +213,12 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg) ...@@ -213,9 +213,12 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
ceph_ino(inode), dl.object_no); ceph_ino(inode), dl.object_no);
oloc.pool = ci->i_layout.pool_id; oloc.pool = ci->i_layout.pool_id;
oloc.pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
ceph_oid_printf(&oid, "%s", dl.object_name); ceph_oid_printf(&oid, "%s", dl.object_name);
r = ceph_object_locator_to_pg(osdc->osdmap, &oid, &oloc, &pgid); r = ceph_object_locator_to_pg(osdc->osdmap, &oid, &oloc, &pgid);
ceph_oloc_destroy(&oloc);
if (r < 0) { if (r < 0) {
up_read(&osdc->lock); up_read(&osdc->lock);
return r; return r;
......
...@@ -100,12 +100,15 @@ static int parse_reply_info_in(void **p, void *end, ...@@ -100,12 +100,15 @@ static int parse_reply_info_in(void **p, void *end,
} else } else
info->inline_version = CEPH_INLINE_NONE; info->inline_version = CEPH_INLINE_NONE;
info->pool_ns_len = 0;
info->pool_ns_data = NULL;
if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
ceph_decode_32_safe(p, end, info->pool_ns_len, bad); ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
if (info->pool_ns_len > 0) {
ceph_decode_need(p, end, info->pool_ns_len, bad); ceph_decode_need(p, end, info->pool_ns_len, bad);
info->pool_ns_data = *p;
*p += info->pool_ns_len; *p += info->pool_ns_len;
} else { }
info->pool_ns_len = 0;
} }
return 0; return 0;
...@@ -2292,14 +2295,6 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, ...@@ -2292,14 +2295,6 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
CEPH_CAP_PIN); CEPH_CAP_PIN);
/* deny access to directories with pool_ns layouts */
if (req->r_inode && S_ISDIR(req->r_inode->i_mode) &&
ceph_inode(req->r_inode)->i_pool_ns_len)
return -EIO;
if (req->r_locked_dir &&
ceph_inode(req->r_locked_dir)->i_pool_ns_len)
return -EIO;
/* issue */ /* issue */
mutex_lock(&mdsc->mutex); mutex_lock(&mdsc->mutex);
__register_request(mdsc, req, dir); __register_request(mdsc, req, dir);
......
...@@ -45,6 +45,7 @@ struct ceph_mds_reply_info_in { ...@@ -45,6 +45,7 @@ struct ceph_mds_reply_info_in {
u32 inline_len; u32 inline_len;
char *inline_data; char *inline_data;
u32 pool_ns_len; u32 pool_ns_len;
char *pool_ns_data;
}; };
struct ceph_mds_reply_dir_entry { struct ceph_mds_reply_dir_entry {
...@@ -277,6 +278,8 @@ struct ceph_pool_perm { ...@@ -277,6 +278,8 @@ struct ceph_pool_perm {
struct rb_node node; struct rb_node node;
int perm; int perm;
s64 pool; s64 pool;
size_t pool_ns_len;
char pool_ns[];
}; };
/* /*
......
...@@ -287,7 +287,6 @@ struct ceph_inode_info { ...@@ -287,7 +287,6 @@ struct ceph_inode_info {
struct ceph_dir_layout i_dir_layout; struct ceph_dir_layout i_dir_layout;
struct ceph_file_layout i_layout; struct ceph_file_layout i_layout;
size_t i_pool_ns_len;
char *i_symlink; char *i_symlink;
/* for dirs */ /* for dirs */
......
...@@ -57,56 +57,69 @@ struct ceph_vxattr { ...@@ -57,56 +57,69 @@ struct ceph_vxattr {
static bool ceph_vxattrcb_layout_exists(struct ceph_inode_info *ci) static bool ceph_vxattrcb_layout_exists(struct ceph_inode_info *ci)
{ {
size_t s; struct ceph_file_layout *fl = &ci->i_layout;
char *p = (char *)&ci->i_layout; return (fl->stripe_unit > 0 || fl->stripe_count > 0 ||
fl->object_size > 0 || fl->pool_id >= 0 ||
for (s = 0; s < sizeof(ci->i_layout); s++, p++) rcu_dereference_raw(fl->pool_ns) != NULL);
if (*p)
return true;
return false;
} }
static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val, static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
size_t size) size_t size)
{ {
int ret;
struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb); struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb);
struct ceph_osd_client *osdc = &fsc->client->osdc; struct ceph_osd_client *osdc = &fsc->client->osdc;
struct ceph_string *pool_ns;
s64 pool = ci->i_layout.pool_id; s64 pool = ci->i_layout.pool_id;
const char *pool_name; const char *pool_name;
const char *ns_field = " pool_namespace=";
char buf[128]; char buf[128];
size_t len, total_len = 0;
int ret;
pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
dout("ceph_vxattrcb_layout %p\n", &ci->vfs_inode); dout("ceph_vxattrcb_layout %p\n", &ci->vfs_inode);
down_read(&osdc->lock); down_read(&osdc->lock);
pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool); pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool);
if (pool_name) { if (pool_name) {
size_t len = strlen(pool_name); len = snprintf(buf, sizeof(buf),
ret = snprintf(buf, sizeof(buf),
"stripe_unit=%u stripe_count=%u object_size=%u pool=", "stripe_unit=%u stripe_count=%u object_size=%u pool=",
ci->i_layout.stripe_unit, ci->i_layout.stripe_count, ci->i_layout.stripe_unit, ci->i_layout.stripe_count,
ci->i_layout.object_size); ci->i_layout.object_size);
total_len = len + strlen(pool_name);
} else {
len = snprintf(buf, sizeof(buf),
"stripe_unit=%u stripe_count=%u object_size=%u pool=%lld",
ci->i_layout.stripe_unit, ci->i_layout.stripe_count,
ci->i_layout.object_size, (unsigned long long)pool);
total_len = len;
}
if (pool_ns)
total_len += strlen(ns_field) + pool_ns->len;
if (!size) { if (!size) {
ret += len; ret = total_len;
} else if (ret + len > size) { } else if (total_len > size) {
ret = -ERANGE; ret = -ERANGE;
} else { } else {
memcpy(val, buf, ret); memcpy(val, buf, len);
ret = len;
if (pool_name) {
len = strlen(pool_name);
memcpy(val + ret, pool_name, len); memcpy(val + ret, pool_name, len);
ret += len; ret += len;
} }
} else { if (pool_ns) {
ret = snprintf(buf, sizeof(buf), len = strlen(ns_field);
"stripe_unit=%u stripe_count=%u object_size=%u pool=%lld", memcpy(val + ret, ns_field, len);
ci->i_layout.stripe_unit, ci->i_layout.stripe_count, ret += len;
ci->i_layout.object_size, (unsigned long long)pool); memcpy(val + ret, pool_ns->str, pool_ns->len);
if (size) { ret += pool_ns->len;
if (ret <= size)
memcpy(val, buf, ret);
else
ret = -ERANGE;
} }
} }
up_read(&osdc->lock); up_read(&osdc->lock);
ceph_put_string(pool_ns);
return ret; return ret;
} }
...@@ -147,6 +160,18 @@ static size_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci, ...@@ -147,6 +160,18 @@ static size_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci,
return ret; return ret;
} }
static size_t ceph_vxattrcb_layout_pool_namespace(struct ceph_inode_info *ci,
char *val, size_t size)
{
int ret = 0;
struct ceph_string *ns = ceph_try_get_string(ci->i_layout.pool_ns);
if (ns) {
ret = snprintf(val, size, "%.*s", (int)ns->len, ns->str);
ceph_put_string(ns);
}
return ret;
}
/* directories */ /* directories */
static size_t ceph_vxattrcb_dir_entries(struct ceph_inode_info *ci, char *val, static size_t ceph_vxattrcb_dir_entries(struct ceph_inode_info *ci, char *val,
...@@ -235,6 +260,7 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = { ...@@ -235,6 +260,7 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = {
XATTR_LAYOUT_FIELD(dir, layout, stripe_count), XATTR_LAYOUT_FIELD(dir, layout, stripe_count),
XATTR_LAYOUT_FIELD(dir, layout, object_size), XATTR_LAYOUT_FIELD(dir, layout, object_size),
XATTR_LAYOUT_FIELD(dir, layout, pool), XATTR_LAYOUT_FIELD(dir, layout, pool),
XATTR_LAYOUT_FIELD(dir, layout, pool_namespace),
XATTR_NAME_CEPH(dir, entries), XATTR_NAME_CEPH(dir, entries),
XATTR_NAME_CEPH(dir, files), XATTR_NAME_CEPH(dir, files),
XATTR_NAME_CEPH(dir, subdirs), XATTR_NAME_CEPH(dir, subdirs),
...@@ -262,6 +288,7 @@ static struct ceph_vxattr ceph_file_vxattrs[] = { ...@@ -262,6 +288,7 @@ static struct ceph_vxattr ceph_file_vxattrs[] = {
XATTR_LAYOUT_FIELD(file, layout, stripe_count), XATTR_LAYOUT_FIELD(file, layout, stripe_count),
XATTR_LAYOUT_FIELD(file, layout, object_size), XATTR_LAYOUT_FIELD(file, layout, object_size),
XATTR_LAYOUT_FIELD(file, layout, pool), XATTR_LAYOUT_FIELD(file, layout, pool),
XATTR_LAYOUT_FIELD(file, layout, pool_namespace),
{ .name = NULL, 0 } /* Required table terminator */ { .name = NULL, 0 } /* Required table terminator */
}; };
static size_t ceph_file_vxattrs_name_size; /* total size of all names */ static size_t ceph_file_vxattrs_name_size; /* total size of all names */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment