Commit 30c156d9 authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov

libceph: rados pool namespace support

Add pool namesapce pointer to struct ceph_file_layout and struct
ceph_object_locator. Pool namespace is used by when mapping object
to PG, it's also used when composing OSD request.

The namespace pointer in struct ceph_file_layout is RCU protected.
So libceph can read namespace without taking lock.
Signed-off-by: default avatarYan, Zheng <zyan@redhat.com>
[idryomov@gmail.com: ceph_oloc_destroy(), misc minor changes]
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent 51e92737
......@@ -3999,6 +3999,7 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
rbd_dev->layout.stripe_count = 1;
rbd_dev->layout.object_size = 1 << RBD_MAX_OBJ_ORDER;
rbd_dev->layout.pool_id = spec->pool_id;
RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
/*
* If this is a mapping rbd_dev (as opposed to a parent one),
......
......@@ -446,6 +446,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
ci->i_symlink = NULL;
memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout));
RCU_INIT_POINTER(ci->i_layout.pool_ns, NULL);
ci->i_pool_ns_len = 0;
ci->i_fragtree = RB_ROOT;
......@@ -570,6 +571,8 @@ void ceph_destroy_inode(struct inode *inode)
if (ci->i_xattrs.prealloc_blob)
ceph_buffer_put(ci->i_xattrs.prealloc_blob);
ceph_put_string(ci->i_layout.pool_ns);
call_rcu(&inode->i_rcu, ceph_i_callback);
}
......
......@@ -53,6 +53,7 @@ struct ceph_file_layout_legacy {
__le32 fl_pg_pool; /* namespace, crush ruleset, rep level */
} __attribute__ ((packed));
struct ceph_string;
/*
* ceph_file_layout - describe data layout for a file/inode
*/
......@@ -62,6 +63,7 @@ struct ceph_file_layout {
u32 stripe_count; /* over this many objects */
u32 object_size; /* until objects are this big */
s64 pool_id; /* rados pool id */
struct ceph_string __rcu *pool_ns; /* rados pool namespace */
};
extern int ceph_file_layout_is_valid(const struct ceph_file_layout *layout);
......
......@@ -63,11 +63,13 @@ static inline bool ceph_can_shift_osds(struct ceph_pg_pool_info *pool)
struct ceph_object_locator {
s64 pool;
struct ceph_string *pool_ns;
};
static inline void ceph_oloc_init(struct ceph_object_locator *oloc)
{
oloc->pool = -1;
oloc->pool_ns = NULL;
}
static inline bool ceph_oloc_empty(const struct ceph_object_locator *oloc)
......@@ -75,11 +77,9 @@ static inline bool ceph_oloc_empty(const struct ceph_object_locator *oloc)
return oloc->pool == -1;
}
static inline void ceph_oloc_copy(struct ceph_object_locator *dest,
const struct ceph_object_locator *src)
{
dest->pool = src->pool;
}
void ceph_oloc_copy(struct ceph_object_locator *dest,
const struct ceph_object_locator *src);
void ceph_oloc_destroy(struct ceph_object_locator *oloc);
/*
* Maximum supported by kernel client object name length
......
......@@ -156,8 +156,16 @@ static void dump_target(struct seq_file *s, struct ceph_osd_request_target *t)
seq_printf(s, "]/%d\t[", t->up.primary);
for (i = 0; i < t->acting.size; i++)
seq_printf(s, "%s%d", (!i ? "" : ","), t->acting.osds[i]);
seq_printf(s, "]/%d\t%*pE\t0x%x", t->acting.primary,
seq_printf(s, "]/%d\t", t->acting.primary);
if (t->target_oloc.pool_ns) {
seq_printf(s, "%*pE/%*pE\t0x%x",
(int)t->target_oloc.pool_ns->len,
t->target_oloc.pool_ns->str,
t->target_oid.name_len, t->target_oid.name, t->flags);
} else {
seq_printf(s, "%*pE\t0x%x", t->target_oid.name_len,
t->target_oid.name, t->flags);
}
if (t->paused)
seq_puts(s, "\tP");
}
......
......@@ -387,7 +387,9 @@ static void target_copy(struct ceph_osd_request_target *dest,
static void target_destroy(struct ceph_osd_request_target *t)
{
ceph_oid_destroy(&t->base_oid);
ceph_oloc_destroy(&t->base_oloc);
ceph_oid_destroy(&t->target_oid);
ceph_oloc_destroy(&t->target_oloc);
}
/*
......@@ -533,6 +535,11 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
}
EXPORT_SYMBOL(ceph_osdc_alloc_request);
static int ceph_oloc_encoding_size(struct ceph_object_locator *oloc)
{
return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
}
int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
{
struct ceph_osd_client *osdc = req->r_osdc;
......@@ -540,11 +547,13 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
int msg_size;
WARN_ON(ceph_oid_empty(&req->r_base_oid));
WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
/* create request message */
msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */
msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */
msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
msg_size += CEPH_ENCODING_START_BLK_LEN +
ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */
msg_size += 1 + 8 + 4 + 4; /* pgid */
msg_size += 4 + req->r_base_oid.name_len; /* oid */
msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op);
......@@ -949,6 +958,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
req->r_flags = flags;
req->r_base_oloc.pool = layout->pool_id;
req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
req->r_snapid = vino.snap;
......@@ -1489,12 +1499,16 @@ static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
p += sizeof(req->r_replay_version);
/* oloc */
ceph_encode_8(&p, 4);
ceph_encode_8(&p, 4);
ceph_encode_32(&p, 8 + 4 + 4);
ceph_start_encoding(&p, 5, 4,
ceph_oloc_encoding_size(&req->r_t.target_oloc));
ceph_encode_64(&p, req->r_t.target_oloc.pool);
ceph_encode_32(&p, -1); /* preferred */
ceph_encode_32(&p, 0); /* key len */
if (req->r_t.target_oloc.pool_ns)
ceph_encode_string(&p, end, req->r_t.target_oloc.pool_ns->str,
req->r_t.target_oloc.pool_ns->len);
else
ceph_encode_32(&p, 0);
/* pgid */
ceph_encode_8(&p, 1);
......@@ -2596,8 +2610,8 @@ static int ceph_oloc_decode(void **p, void *end,
if (struct_v >= 5) {
len = ceph_decode_32(p);
if (len > 0) {
pr_warn("ceph_object_locator::nspace is set\n");
goto e_inval;
ceph_decode_need(p, end, len, e_inval);
*p += len;
}
}
......@@ -2835,7 +2849,11 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
unlink_request(osd, req);
mutex_unlock(&osd->lock);
ceph_oloc_copy(&req->r_t.target_oloc, &m.redirect.oloc);
/*
* Not ceph_oloc_copy() - changing pool_ns is not
* supported.
*/
req->r_t.target_oloc.pool = m.redirect.oloc.pool;
req->r_flags |= CEPH_OSD_FLAG_REDIRECTED;
req->r_tid = 0;
__submit_request(req, false);
......
......@@ -1510,6 +1510,24 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
return ERR_PTR(err);
}
void ceph_oloc_copy(struct ceph_object_locator *dest,
const struct ceph_object_locator *src)
{
WARN_ON(!ceph_oloc_empty(dest));
WARN_ON(dest->pool_ns); /* empty() only covers ->pool */
dest->pool = src->pool;
if (src->pool_ns)
dest->pool_ns = ceph_get_string(src->pool_ns);
}
EXPORT_SYMBOL(ceph_oloc_copy);
void ceph_oloc_destroy(struct ceph_object_locator *oloc)
{
ceph_put_string(oloc->pool_ns);
}
EXPORT_SYMBOL(ceph_oloc_destroy);
void ceph_oid_copy(struct ceph_object_id *dest,
const struct ceph_object_id *src)
{
......@@ -1844,12 +1862,34 @@ int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
if (!pi)
return -ENOENT;
if (!oloc->pool_ns) {
raw_pgid->pool = oloc->pool;
raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name,
oid->name_len);
dout("%s %s -> raw_pgid %llu.%x\n", __func__, oid->name,
raw_pgid->pool, raw_pgid->seed);
} else {
char stack_buf[256];
char *buf = stack_buf;
int nsl = oloc->pool_ns->len;
size_t total = nsl + 1 + oid->name_len;
if (total > sizeof(stack_buf)) {
buf = kmalloc(total, GFP_NOIO);
if (!buf)
return -ENOMEM;
}
memcpy(buf, oloc->pool_ns->str, nsl);
buf[nsl] = '\037';
memcpy(buf + nsl + 1, oid->name, oid->name_len);
raw_pgid->pool = oloc->pool;
raw_pgid->seed = ceph_str_hash(pi->object_hash, buf, total);
if (buf != stack_buf)
kfree(buf);
dout("%s %s ns %.*s -> raw_pgid %llu.%x\n", __func__,
oid->name, nsl, oloc->pool_ns->str,
raw_pgid->pool, raw_pgid->seed);
}
return 0;
}
EXPORT_SYMBOL(ceph_object_locator_to_pg);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment