Commit 6f6c7006 authored by Sage Weil's avatar Sage Weil

libceph: fix osd request queuing on osdmap updates

If we send a request to osd A, and the request's pg remaps to osd B and
then back to A in quick succession, we need to resend the request to A. The
old code was only calling kick_requests after processing all incremental
maps in a message, so it was very possible to not resend a request that
needed to be resent.  This would make the osd eventually time out (at least
with the current default of osd timeouts enabled).

The correct approach is to scan requests on every map incremental.  This
patch refactors the kick code in a few ways:
 - all requests are either on req_lru (in flight), req_unsent (ready to
   send), or req_notarget (currently map to no up osd)
 - mapping always done by map_request (previous map_osds)
 - if the mapping changes, we requeue.  requests are resent only after all
   map incrementals are processed.
 - some osd reset code is moved out of kick_requests into a separate
   function
 - the "kick this osd" functionality is moved to kick_osd_requests, as it
   is unrelated to scanning for request->pg->osd mapping changes
Signed-off-by: default avatarSage Weil <sage@newdream.net>
parent 09adc80c
......@@ -74,7 +74,6 @@ struct ceph_osd_request {
char r_oid[40]; /* object name */
int r_oid_len;
unsigned long r_stamp; /* send OR check time */
bool r_resend; /* msg send failed, needs retry */
struct ceph_file_layout r_file_layout;
struct ceph_snap_context *r_snapc; /* snap context for writes */
......@@ -104,7 +103,9 @@ struct ceph_osd_client {
u64 timeout_tid; /* tid of timeout triggering rq */
u64 last_tid; /* tid of last request */
struct rb_root requests; /* pending requests */
struct list_head req_lru; /* pending requests lru */
struct list_head req_lru; /* in-flight lru */
struct list_head req_unsent; /* unsent/need-resend queue */
struct list_head req_notarget; /* map to no osd */
int num_requests;
struct delayed_work timeout_work;
struct delayed_work osds_timeout_work;
......
......@@ -22,10 +22,9 @@
#define OSD_OPREPLY_FRONT_LEN 512
static const struct ceph_connection_operations osd_con_ops;
static int __kick_requests(struct ceph_osd_client *osdc,
struct ceph_osd *kickosd);
static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
static void send_queued(struct ceph_osd_client *osdc);
static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
static int op_needs_trail(int op)
{
......@@ -529,6 +528,35 @@ __lookup_request_ge(struct ceph_osd_client *osdc,
return NULL;
}
/*
* Resubmit requests pending on the given osd.
*/
static void __kick_osd_requests(struct ceph_osd_client *osdc,
struct ceph_osd *osd)
{
struct ceph_osd_request *req;
int err;
dout("__kick_osd_requests osd%d\n", osd->o_osd);
err = __reset_osd(osdc, osd);
if (err == -EAGAIN)
return;
list_for_each_entry(req, &osd->o_requests, r_osd_item) {
list_move(&req->r_req_lru_item, &osdc->req_unsent);
dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
osd->o_osd);
req->r_flags |= CEPH_OSD_FLAG_RETRY;
}
}
static void kick_osd_requests(struct ceph_osd_client *osdc,
struct ceph_osd *kickosd)
{
mutex_lock(&osdc->request_mutex);
__kick_osd_requests(osdc, kickosd);
mutex_unlock(&osdc->request_mutex);
}
/*
* If the osd connection drops, we need to resubmit all requests.
......@@ -543,7 +571,8 @@ static void osd_reset(struct ceph_connection *con)
dout("osd_reset osd%d\n", osd->o_osd);
osdc = osd->o_osdc;
down_read(&osdc->map_sem);
kick_requests(osdc, osd);
kick_osd_requests(osdc, osd);
send_queued(osdc);
up_read(&osdc->map_sem);
}
......@@ -781,20 +810,20 @@ static void __cancel_request(struct ceph_osd_request *req)
ceph_con_revoke(&req->r_osd->o_con, req->r_request);
req->r_sent = 0;
}
list_del_init(&req->r_req_lru_item);
}
/*
* Pick an osd (the first 'up' osd in the pg), allocate the osd struct
* (as needed), and set the request r_osd appropriately. If there is
* no up osd, set r_osd to NULL.
* no up osd, set r_osd to NULL. Move the request to the appropiate list
* (unsent, homeless) or leave on in-flight lru.
*
* Return 0 if unchanged, 1 if changed, or negative on error.
*
* Caller should hold map_sem for read and request_mutex.
*/
static int __map_osds(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
static int __map_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
{
struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
struct ceph_pg pgid;
......@@ -802,11 +831,13 @@ static int __map_osds(struct ceph_osd_client *osdc,
int o = -1, num = 0;
int err;
dout("map_osds %p tid %lld\n", req, req->r_tid);
dout("map_request %p tid %lld\n", req, req->r_tid);
err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
&req->r_file_layout, osdc->osdmap);
if (err)
if (err) {
list_move(&req->r_req_lru_item, &osdc->req_notarget);
return err;
}
pgid = reqhead->layout.ol_pgid;
req->r_pgid = pgid;
......@@ -823,7 +854,7 @@ static int __map_osds(struct ceph_osd_client *osdc,
(req->r_osd == NULL && o == -1))
return 0; /* no change */
dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n",
dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n",
req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
req->r_osd ? req->r_osd->o_osd : -1);
......@@ -841,10 +872,12 @@ static int __map_osds(struct ceph_osd_client *osdc,
if (!req->r_osd && o >= 0) {
err = -ENOMEM;
req->r_osd = create_osd(osdc);
if (!req->r_osd)
if (!req->r_osd) {
list_move(&req->r_req_lru_item, &osdc->req_notarget);
goto out;
}
dout("map_osds osd %p is osd%d\n", req->r_osd, o);
dout("map_request osd %p is osd%d\n", req->r_osd, o);
req->r_osd->o_osd = o;
req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
__insert_osd(osdc, req->r_osd);
......@@ -855,6 +888,9 @@ static int __map_osds(struct ceph_osd_client *osdc,
if (req->r_osd) {
__remove_osd_from_lru(req->r_osd);
list_add(&req->r_osd_item, &req->r_osd->o_requests);
list_move(&req->r_req_lru_item, &osdc->req_unsent);
} else {
list_move(&req->r_req_lru_item, &osdc->req_notarget);
}
err = 1; /* osd or pg changed */
......@@ -869,16 +905,6 @@ static int __send_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
{
struct ceph_osd_request_head *reqhead;
int err;
err = __map_osds(osdc, req);
if (err < 0)
return err;
if (req->r_osd == NULL) {
dout("send_request %p no up osds in pg\n", req);
ceph_monc_request_next_osdmap(&osdc->client->monc);
return 0;
}
dout("send_request %p tid %llu to osd%d flags %d\n",
req, req->r_tid, req->r_osd->o_osd, req->r_flags);
......@@ -897,6 +923,21 @@ static int __send_request(struct ceph_osd_client *osdc,
return 0;
}
/*
* Send any requests in the queue (req_unsent).
*/
static void send_queued(struct ceph_osd_client *osdc)
{
struct ceph_osd_request *req, *tmp;
dout("send_queued\n");
mutex_lock(&osdc->request_mutex);
list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
__send_request(osdc, req);
}
mutex_unlock(&osdc->request_mutex);
}
/*
* Timeout callback, called every N seconds when 1 or more osd
* requests has been active for more than N seconds. When this
......@@ -916,7 +957,6 @@ static void handle_timeout(struct work_struct *work)
unsigned long keepalive =
osdc->client->options->osd_keepalive_timeout * HZ;
unsigned long last_stamp = 0;
struct rb_node *p;
struct list_head slow_osds;
dout("timeout\n");
......@@ -925,21 +965,6 @@ static void handle_timeout(struct work_struct *work)
ceph_monc_request_next_osdmap(&osdc->client->monc);
mutex_lock(&osdc->request_mutex);
for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
req = rb_entry(p, struct ceph_osd_request, r_node);
if (req->r_resend) {
int err;
dout("osdc resending prev failed %lld\n", req->r_tid);
err = __send_request(osdc, req);
if (err)
dout("osdc failed again on %lld\n", req->r_tid);
else
req->r_resend = false;
continue;
}
}
/*
* reset osds that appear to be _really_ unresponsive. this
......@@ -963,7 +988,7 @@ static void handle_timeout(struct work_struct *work)
BUG_ON(!osd);
pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
req->r_tid, osd->o_osd);
__kick_requests(osdc, osd);
__kick_osd_requests(osdc, osd);
}
/*
......@@ -991,7 +1016,7 @@ static void handle_timeout(struct work_struct *work)
__schedule_osd_timeout(osdc);
mutex_unlock(&osdc->request_mutex);
send_queued(osdc);
up_read(&osdc->map_sem);
}
......@@ -1109,108 +1134,61 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
ceph_msg_dump(msg);
}
static int __kick_requests(struct ceph_osd_client *osdc,
struct ceph_osd *kickosd)
static void reset_changed_osds(struct ceph_osd_client *osdc)
{
struct ceph_osd_request *req;
struct rb_node *p, *n;
int needmap = 0;
int err;
dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
if (kickosd) {
err = __reset_osd(osdc, kickosd);
if (err == -EAGAIN)
return 1;
} else {
for (p = rb_first(&osdc->osds); p; p = n) {
struct ceph_osd *osd =
rb_entry(p, struct ceph_osd, o_node);
n = rb_next(p);
if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
memcmp(&osd->o_con.peer_addr,
ceph_osd_addr(osdc->osdmap,
osd->o_osd),
sizeof(struct ceph_entity_addr)) != 0)
__reset_osd(osdc, osd);
}
}
for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
req = rb_entry(p, struct ceph_osd_request, r_node);
if (req->r_resend) {
dout(" r_resend set on tid %llu\n", req->r_tid);
__cancel_request(req);
goto kick;
}
if (req->r_osd && kickosd == req->r_osd) {
__cancel_request(req);
goto kick;
}
err = __map_osds(osdc, req);
if (err == 0)
continue; /* no change */
if (err < 0) {
/*
* FIXME: really, we should set the request
* error and fail if this isn't a 'nofail'
* request, but that's a fair bit more
* complicated to do. So retry!
*/
dout(" setting r_resend on %llu\n", req->r_tid);
req->r_resend = true;
continue;
}
if (req->r_osd == NULL) {
dout("tid %llu maps to no valid osd\n", req->r_tid);
needmap++; /* request a newer map */
continue;
}
for (p = rb_first(&osdc->osds); p; p = n) {
struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
kick:
dout("kicking %p tid %llu osd%d\n", req, req->r_tid,
req->r_osd ? req->r_osd->o_osd : -1);
req->r_flags |= CEPH_OSD_FLAG_RETRY;
err = __send_request(osdc, req);
if (err) {
dout(" setting r_resend on %llu\n", req->r_tid);
req->r_resend = true;
}
n = rb_next(p);
if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
memcmp(&osd->o_con.peer_addr,
ceph_osd_addr(osdc->osdmap,
osd->o_osd),
sizeof(struct ceph_entity_addr)) != 0)
__reset_osd(osdc, osd);
}
return needmap;
}
/*
* Resubmit osd requests whose osd or osd address has changed. Request
* a new osd map if osds are down, or we are otherwise unable to determine
* how to direct a request.
*
* Close connections to down osds.
*
* If @who is specified, resubmit requests for that specific osd.
* Requeue requests whose mapping to an OSD has changed. If requests map to
* no osd, request a new map.
*
* Caller should hold map_sem for read and request_mutex.
*/
static void kick_requests(struct ceph_osd_client *osdc,
struct ceph_osd *kickosd)
static void kick_requests(struct ceph_osd_client *osdc)
{
int needmap;
struct ceph_osd_request *req;
struct rb_node *p;
int needmap = 0;
int err;
dout("kick_requests\n");
mutex_lock(&osdc->request_mutex);
needmap = __kick_requests(osdc, kickosd);
for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
req = rb_entry(p, struct ceph_osd_request, r_node);
err = __map_request(osdc, req);
if (err < 0)
continue; /* error */
if (req->r_osd == NULL) {
dout("%p tid %llu maps to no osd\n", req, req->r_tid);
needmap++; /* request a newer map */
} else if (err > 0) {
dout("%p tid %llu requeued on osd%d\n", req, req->r_tid,
req->r_osd ? req->r_osd->o_osd : -1);
req->r_flags |= CEPH_OSD_FLAG_RETRY;
}
}
mutex_unlock(&osdc->request_mutex);
if (needmap) {
dout("%d requests for down osds, need new map\n", needmap);
ceph_monc_request_next_osdmap(&osdc->client->monc);
}
}
/*
* Process updated osd map.
*
......@@ -1263,6 +1241,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
ceph_osdmap_destroy(osdc->osdmap);
osdc->osdmap = newmap;
}
kick_requests(osdc);
reset_changed_osds(osdc);
} else {
dout("ignoring incremental map %u len %d\n",
epoch, maplen);
......@@ -1300,6 +1280,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
osdc->osdmap = newmap;
if (oldmap)
ceph_osdmap_destroy(oldmap);
kick_requests(osdc);
}
p += maplen;
nr_maps--;
......@@ -1308,8 +1289,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
done:
downgrade_write(&osdc->map_sem);
ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
if (newmap)
kick_requests(osdc, NULL);
send_queued(osdc);
up_read(&osdc->map_sem);
wake_up_all(&osdc->client->auth_wq);
return;
......@@ -1347,15 +1327,22 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
* the request still han't been touched yet.
*/
if (req->r_sent == 0) {
rc = __send_request(osdc, req);
if (rc) {
if (nofail) {
dout("osdc_start_request failed send, "
" marking %lld\n", req->r_tid);
req->r_resend = true;
rc = 0;
} else {
__unregister_request(osdc, req);
rc = __map_request(osdc, req);
if (rc < 0)
return rc;
if (req->r_osd == NULL) {
dout("send_request %p no up osds in pg\n", req);
ceph_monc_request_next_osdmap(&osdc->client->monc);
} else {
rc = __send_request(osdc, req);
if (rc) {
if (nofail) {
dout("osdc_start_request failed send, "
" will retry %lld\n", req->r_tid);
rc = 0;
} else {
__unregister_request(osdc, req);
}
}
}
}
......@@ -1441,6 +1428,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
INIT_LIST_HEAD(&osdc->osd_lru);
osdc->requests = RB_ROOT;
INIT_LIST_HEAD(&osdc->req_lru);
INIT_LIST_HEAD(&osdc->req_unsent);
INIT_LIST_HEAD(&osdc->req_notarget);
osdc->num_requests = 0;
INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment