Commit a66dd383 authored by Ilya Dryomov's avatar Ilya Dryomov

libceph: switch to calc_target(), part 1

Replace __calc_request_pg() and most of __map_request() with
calc_target() and start using req->r_t.

ceph_osdc_build_request() however still encodes base_oid, because it's
called before calc_target() is and target_oid is empty at that point in
time; a printf in osdc_show() also shows base_oid.  This is fixed in
"libceph: switch to calc_target(), part 2".
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent 63244fa1
...@@ -150,12 +150,13 @@ struct ceph_osd_request { ...@@ -150,12 +150,13 @@ struct ceph_osd_request {
struct list_head r_linger_item; struct list_head r_linger_item;
struct list_head r_linger_osd_item; struct list_head r_linger_osd_item;
struct ceph_osd *r_osd; struct ceph_osd *r_osd;
struct ceph_pg r_pgid;
int r_pg_osds[CEPH_PG_MAX_SIZE]; struct ceph_osd_request_target r_t;
int r_num_pg_osds; #define r_base_oid r_t.base_oid
#define r_base_oloc r_t.base_oloc
#define r_flags r_t.flags
struct ceph_msg *r_request, *r_reply; struct ceph_msg *r_request, *r_reply;
int r_flags; /* any additional flags for the osd */
u32 r_sent; /* >0 if r_request is sending/sent */ u32 r_sent; /* >0 if r_request is sending/sent */
/* request osd ops array */ /* request osd ops array */
...@@ -167,7 +168,6 @@ struct ceph_osd_request { ...@@ -167,7 +168,6 @@ struct ceph_osd_request {
__le64 *r_request_pool; __le64 *r_request_pool;
void *r_request_pgid; void *r_request_pgid;
__le32 *r_request_attempts; __le32 *r_request_attempts;
bool r_paused;
struct ceph_eversion *r_request_reassert_version; struct ceph_eversion *r_request_reassert_version;
int r_result; int r_result;
...@@ -186,11 +186,6 @@ struct ceph_osd_request { ...@@ -186,11 +186,6 @@ struct ceph_osd_request {
struct inode *r_inode; /* for use by callbacks */ struct inode *r_inode; /* for use by callbacks */
void *r_priv; /* ditto */ void *r_priv; /* ditto */
struct ceph_object_locator r_base_oloc;
struct ceph_object_id r_base_oid;
struct ceph_object_locator r_target_oloc;
struct ceph_object_id r_target_oid;
u64 r_snapid; u64 r_snapid;
unsigned long r_stamp; /* send OR check time */ unsigned long r_stamp; /* send OR check time */
......
...@@ -161,7 +161,7 @@ static int osdc_show(struct seq_file *s, void *pp) ...@@ -161,7 +161,7 @@ static int osdc_show(struct seq_file *s, void *pp)
seq_printf(s, "%lld\tosd%d\t%lld.%x\t", req->r_tid, seq_printf(s, "%lld\tosd%d\t%lld.%x\t", req->r_tid,
req->r_osd ? req->r_osd->o_osd : -1, req->r_osd ? req->r_osd->o_osd : -1,
req->r_pgid.pool, req->r_pgid.seed); req->r_t.pgid.pool, req->r_t.pgid.seed);
seq_printf(s, "%*pE", req->r_base_oid.name_len, seq_printf(s, "%*pE", req->r_base_oid.name_len,
req->r_base_oid.name); req->r_base_oid.name);
......
...@@ -350,8 +350,7 @@ static void ceph_osdc_release_request(struct kref *kref) ...@@ -350,8 +350,7 @@ static void ceph_osdc_release_request(struct kref *kref)
for (which = 0; which < req->r_num_ops; which++) for (which = 0; which < req->r_num_ops; which++)
osd_req_op_data_release(req, which); osd_req_op_data_release(req, which);
ceph_oid_destroy(&req->r_base_oid); target_destroy(&req->r_t);
ceph_oid_destroy(&req->r_target_oid);
ceph_put_snap_context(req->r_snapc); ceph_put_snap_context(req->r_snapc);
if (req->r_mempool) if (req->r_mempool)
...@@ -420,10 +419,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, ...@@ -420,10 +419,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
INIT_LIST_HEAD(&req->r_req_lru_item); INIT_LIST_HEAD(&req->r_req_lru_item);
INIT_LIST_HEAD(&req->r_osd_item); INIT_LIST_HEAD(&req->r_osd_item);
ceph_oid_init(&req->r_base_oid); target_init(&req->r_t);
req->r_base_oloc.pool = -1;
ceph_oid_init(&req->r_target_oid);
req->r_target_oloc.pool = -1;
dout("%s req %p\n", __func__, req); dout("%s req %p\n", __func__, req);
return req; return req;
...@@ -1308,16 +1304,6 @@ static bool __pool_full(struct ceph_pg_pool_info *pi) ...@@ -1308,16 +1304,6 @@ static bool __pool_full(struct ceph_pg_pool_info *pi)
* *
* Caller should hold map_sem for read. * Caller should hold map_sem for read.
*/ */
static bool __req_should_be_paused(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
{
bool pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
bool pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
return (req->r_flags & CEPH_OSD_FLAG_READ && pauserd) ||
(req->r_flags & CEPH_OSD_FLAG_WRITE && pausewr);
}
static bool target_should_be_paused(struct ceph_osd_client *osdc, static bool target_should_be_paused(struct ceph_osd_client *osdc,
const struct ceph_osd_request_target *t, const struct ceph_osd_request_target *t,
struct ceph_pg_pool_info *pi) struct ceph_pg_pool_info *pi)
...@@ -1332,45 +1318,6 @@ static bool target_should_be_paused(struct ceph_osd_client *osdc, ...@@ -1332,45 +1318,6 @@ static bool target_should_be_paused(struct ceph_osd_client *osdc,
(t->flags & CEPH_OSD_FLAG_WRITE && pausewr); (t->flags & CEPH_OSD_FLAG_WRITE && pausewr);
} }
/*
* Calculate mapping of a request to a PG. Takes tiering into account.
*/
static int __calc_request_pg(struct ceph_osdmap *osdmap,
struct ceph_osd_request *req,
struct ceph_pg *pg_out)
{
bool need_check_tiering;
need_check_tiering = false;
if (req->r_target_oloc.pool == -1) {
req->r_target_oloc = req->r_base_oloc; /* struct */
need_check_tiering = true;
}
if (ceph_oid_empty(&req->r_target_oid)) {
ceph_oid_copy(&req->r_target_oid, &req->r_base_oid);
need_check_tiering = true;
}
if (need_check_tiering &&
(req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
struct ceph_pg_pool_info *pi;
pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool);
if (pi) {
if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
pi->read_tier >= 0)
req->r_target_oloc.pool = pi->read_tier;
if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
pi->write_tier >= 0)
req->r_target_oloc.pool = pi->write_tier;
}
/* !pi is caught in ceph_oloc_oid_to_pg() */
}
return ceph_object_locator_to_pg(osdmap, &req->r_target_oid,
&req->r_target_oloc, pg_out);
}
enum calc_target_result { enum calc_target_result {
CALC_TARGET_NO_ACTION = 0, CALC_TARGET_NO_ACTION = 0,
CALC_TARGET_NEED_RESEND, CALC_TARGET_NEED_RESEND,
...@@ -1510,46 +1457,26 @@ static void __enqueue_request(struct ceph_osd_request *req) ...@@ -1510,46 +1457,26 @@ static void __enqueue_request(struct ceph_osd_request *req)
static int __map_request(struct ceph_osd_client *osdc, static int __map_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req, int force_resend) struct ceph_osd_request *req, int force_resend)
{ {
struct ceph_pg pgid; enum calc_target_result ct_res;
struct ceph_osds up, acting;
int err; int err;
bool was_paused;
dout("map_request %p tid %lld\n", req, req->r_tid); dout("map_request %p tid %lld\n", req, req->r_tid);
err = __calc_request_pg(osdc->osdmap, req, &pgid); ct_res = calc_target(osdc, &req->r_t, NULL, force_resend);
if (err) { switch (ct_res) {
case CALC_TARGET_POOL_DNE:
list_move(&req->r_req_lru_item, &osdc->req_notarget); list_move(&req->r_req_lru_item, &osdc->req_notarget);
return err; return -EIO;
} case CALC_TARGET_NO_ACTION:
req->r_pgid = pgid;
ceph_pg_to_up_acting_osds(osdc->osdmap, &pgid, &up, &acting);
was_paused = req->r_paused;
req->r_paused = __req_should_be_paused(osdc, req);
if (was_paused && !req->r_paused)
force_resend = 1;
if ((!force_resend &&
req->r_osd && req->r_osd->o_osd == acting.primary &&
req->r_sent >= req->r_osd->o_incarnation &&
req->r_num_pg_osds == acting.size &&
memcmp(req->r_pg_osds, acting.osds,
acting.size * sizeof(acting.osds[0])) == 0) ||
(req->r_osd == NULL && acting.primary == -1) ||
req->r_paused)
return 0; /* no change */ return 0; /* no change */
default:
BUG_ON(ct_res != CALC_TARGET_NEED_RESEND);
}
dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n", dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
req->r_tid, pgid.pool, pgid.seed, acting.primary, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed, req->r_t.osd,
req->r_osd ? req->r_osd->o_osd : -1); req->r_osd ? req->r_osd->o_osd : -1);
/* record full pg acting set */
memcpy(req->r_pg_osds, acting.osds,
acting.size * sizeof(acting.osds[0]));
req->r_num_pg_osds = acting.size;
if (req->r_osd) { if (req->r_osd) {
__cancel_request(req); __cancel_request(req);
list_del_init(&req->r_osd_item); list_del_init(&req->r_osd_item);
...@@ -1557,22 +1484,22 @@ static int __map_request(struct ceph_osd_client *osdc, ...@@ -1557,22 +1484,22 @@ static int __map_request(struct ceph_osd_client *osdc,
req->r_osd = NULL; req->r_osd = NULL;
} }
req->r_osd = lookup_osd(&osdc->osds, acting.primary); req->r_osd = lookup_osd(&osdc->osds, req->r_t.osd);
if (!req->r_osd && acting.primary >= 0) { if (!req->r_osd && req->r_t.osd >= 0) {
err = -ENOMEM; err = -ENOMEM;
req->r_osd = create_osd(osdc, acting.primary); req->r_osd = create_osd(osdc, req->r_t.osd);
if (!req->r_osd) { if (!req->r_osd) {
list_move(&req->r_req_lru_item, &osdc->req_notarget); list_move(&req->r_req_lru_item, &osdc->req_notarget);
goto out; goto out;
} }
dout("map_request osd %p is osd%d\n", req->r_osd, dout("map_request osd %p is osd%d\n", req->r_osd,
acting.primary); req->r_osd->o_osd);
insert_osd(&osdc->osds, req->r_osd); insert_osd(&osdc->osds, req->r_osd);
ceph_con_open(&req->r_osd->o_con, ceph_con_open(&req->r_osd->o_con,
CEPH_ENTITY_TYPE_OSD, acting.primary, CEPH_ENTITY_TYPE_OSD, req->r_osd->o_osd,
&osdc->osdmap->osd_addr[acting.primary]); &osdc->osdmap->osd_addr[req->r_osd->o_osd]);
} }
__enqueue_request(req); __enqueue_request(req);
...@@ -1592,15 +1519,15 @@ static void __send_request(struct ceph_osd_client *osdc, ...@@ -1592,15 +1519,15 @@ static void __send_request(struct ceph_osd_client *osdc,
dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n", dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n",
req, req->r_tid, req->r_osd->o_osd, req->r_flags, req, req->r_tid, req->r_osd->o_osd, req->r_flags,
(unsigned long long)req->r_pgid.pool, req->r_pgid.seed); req->r_t.pgid.pool, req->r_t.pgid.seed);
/* fill in message content that changes each time we send it */ /* fill in message content that changes each time we send it */
put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
put_unaligned_le32(req->r_flags, req->r_request_flags); put_unaligned_le32(req->r_flags, req->r_request_flags);
put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool); put_unaligned_le64(req->r_t.target_oloc.pool, req->r_request_pool);
p = req->r_request_pgid; p = req->r_request_pgid;
ceph_encode_64(&p, req->r_pgid.pool); ceph_encode_64(&p, req->r_t.pgid.pool);
ceph_encode_32(&p, req->r_pgid.seed); ceph_encode_32(&p, req->r_t.pgid.seed);
put_unaligned_le64(1, req->r_request_attempts); /* FIXME */ put_unaligned_le64(1, req->r_request_attempts); /* FIXME */
memcpy(req->r_request_reassert_version, &req->r_reassert_version, memcpy(req->r_request_reassert_version, &req->r_reassert_version,
sizeof(req->r_reassert_version)); sizeof(req->r_reassert_version));
...@@ -1963,7 +1890,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) ...@@ -1963,7 +1890,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
__unregister_request(osdc, req); __unregister_request(osdc, req);
ceph_oloc_copy(&req->r_target_oloc, &redir.oloc); ceph_oloc_copy(&req->r_t.target_oloc, &redir.oloc);
/* /*
* Start redirect requests with nofail=true. If * Start redirect requests with nofail=true. If
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment