Commit 5aea3dcd authored by Ilya Dryomov's avatar Ilya Dryomov

libceph: a major OSD client update

This is a major sync up, up to ~Jewel.  The highlights are:

- per-session request trees (vs a global per-client tree)
- per-session locking (vs a global per-client rwlock)
- homeless OSD session
- no ad-hoc global per-client lists
- support for pool quotas
- foundation for watch/notify v2 support
- foundation for map check (pool deletion detection) support

The switchover is incomplete: lingering requests can be setup and
teared down but aren't ever reestablished.  This functionality is
restored with the introduction of the new lingering infrastructure
(ceph_osd_linger_request, linger_work, etc) in a later commit.
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent 9dd2845c
......@@ -193,12 +193,12 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
if (copy_from_user(&dl, arg, sizeof(dl)))
return -EFAULT;
down_read(&osdc->map_sem);
down_read(&osdc->lock);
r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, len,
&dl.object_no, &dl.object_offset,
&olen);
if (r < 0) {
up_read(&osdc->map_sem);
up_read(&osdc->lock);
return -EIO;
}
dl.file_offset -= dl.object_offset;
......@@ -217,7 +217,7 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
r = ceph_object_locator_to_pg(osdc->osdmap, &oid, &oloc, &pgid);
if (r < 0) {
up_read(&osdc->map_sem);
up_read(&osdc->lock);
return r;
}
......@@ -230,7 +230,7 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
} else {
memset(&dl.osd_addr, 0, sizeof(dl.osd_addr));
}
up_read(&osdc->map_sem);
up_read(&osdc->lock);
/* send result back to user */
if (copy_to_user(arg, &dl, sizeof(dl)))
......
......@@ -75,7 +75,7 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
char buf[128];
dout("ceph_vxattrcb_layout %p\n", &ci->vfs_inode);
down_read(&osdc->map_sem);
down_read(&osdc->lock);
pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool);
if (pool_name) {
size_t len = strlen(pool_name);
......@@ -107,7 +107,7 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
ret = -ERANGE;
}
}
up_read(&osdc->map_sem);
up_read(&osdc->lock);
return ret;
}
......@@ -141,13 +141,13 @@ static size_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci,
s64 pool = ceph_file_layout_pg_pool(ci->i_layout);
const char *pool_name;
down_read(&osdc->map_sem);
down_read(&osdc->lock);
pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool);
if (pool_name)
ret = snprintf(val, size, "%s", pool_name);
else
ret = snprintf(val, size, "%lld", (unsigned long long)pool);
up_read(&osdc->map_sem);
up_read(&osdc->lock);
return ret;
}
......
......@@ -33,12 +33,13 @@ struct ceph_osd {
int o_incarnation;
struct rb_node o_node;
struct ceph_connection o_con;
struct list_head o_requests;
struct rb_root o_requests;
struct list_head o_linger_requests;
struct list_head o_osd_lru;
struct ceph_auth_handshake o_auth;
unsigned long lru_ttl;
struct list_head o_keepalive_item;
struct mutex lock;
};
#define CEPH_OSD_SLAB_OPS 2
......@@ -144,8 +145,6 @@ struct ceph_osd_request_target {
struct ceph_osd_request {
u64 r_tid; /* unique for this client */
struct rb_node r_node;
struct list_head r_req_lru_item;
struct list_head r_osd_item;
struct list_head r_linger_item;
struct list_head r_linger_osd_item;
struct ceph_osd *r_osd;
......@@ -219,19 +218,16 @@ struct ceph_osd_client {
struct ceph_client *client;
struct ceph_osdmap *osdmap; /* current map */
struct rw_semaphore map_sem;
struct rw_semaphore lock;
struct mutex request_mutex;
struct rb_root osds; /* osds */
struct list_head osd_lru; /* idle osds */
spinlock_t osd_lru_lock;
u64 last_tid; /* tid of last request */
struct rb_root requests; /* pending requests */
struct list_head req_lru; /* in-flight lru */
struct list_head req_unsent; /* unsent/need-resend queue */
struct list_head req_notarget; /* map to no osd */
struct list_head req_linger; /* lingering requests */
int num_requests;
struct ceph_osd homeless_osd;
atomic64_t last_tid; /* tid of last request */
atomic_t num_requests;
atomic_t num_homeless;
struct delayed_work timeout_work;
struct delayed_work osds_timeout_work;
#ifdef CONFIG_DEBUG_FS
......
......@@ -182,21 +182,39 @@ static void dump_request(struct seq_file *s, struct ceph_osd_request *req)
seq_putc(s, '\n');
}
static void dump_requests(struct seq_file *s, struct ceph_osd *osd)
{
struct rb_node *n;
mutex_lock(&osd->lock);
for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
struct ceph_osd_request *req =
rb_entry(n, struct ceph_osd_request, r_node);
dump_request(s, req);
}
mutex_unlock(&osd->lock);
}
static int osdc_show(struct seq_file *s, void *pp)
{
struct ceph_client *client = s->private;
struct ceph_osd_client *osdc = &client->osdc;
struct rb_node *p;
mutex_lock(&osdc->request_mutex);
for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
struct ceph_osd_request *req;
struct rb_node *n;
req = rb_entry(p, struct ceph_osd_request, r_node);
down_read(&osdc->lock);
seq_printf(s, "REQUESTS %d homeless %d\n",
atomic_read(&osdc->num_requests),
atomic_read(&osdc->num_homeless));
for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
dump_request(s, req);
dump_requests(s, osd);
}
mutex_unlock(&osdc->request_mutex);
dump_requests(s, &osdc->homeless_osd);
up_read(&osdc->lock);
return 0;
}
......
......@@ -25,16 +25,6 @@ static struct kmem_cache *ceph_osd_request_cache;
static const struct ceph_connection_operations osd_con_ops;
static void __send_queued(struct ceph_osd_client *osdc);
static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
static void __register_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req);
static void __unregister_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req);
static void __unregister_linger_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req);
static void __enqueue_request(struct ceph_osd_request *req);
/*
* Implement client access to distributed object storage cluster.
*
......@@ -53,6 +43,43 @@ static void __enqueue_request(struct ceph_osd_request *req);
* channel with an OSD is reset.
*/
static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req);
static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req);
#if 1
static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem)
{
bool wrlocked = true;
if (unlikely(down_read_trylock(sem))) {
wrlocked = false;
up_read(sem);
}
return wrlocked;
}
static inline void verify_osdc_locked(struct ceph_osd_client *osdc)
{
WARN_ON(!rwsem_is_locked(&osdc->lock));
}
static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc)
{
WARN_ON(!rwsem_is_wrlocked(&osdc->lock));
}
static inline void verify_osd_locked(struct ceph_osd *osd)
{
struct ceph_osd_client *osdc = osd->o_osdc;
WARN_ON(!(mutex_is_locked(&osd->lock) &&
rwsem_is_locked(&osdc->lock)) &&
!rwsem_is_wrlocked(&osdc->lock));
}
#else
static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { }
static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { }
static inline void verify_osd_locked(struct ceph_osd *osd) { }
#endif
/*
* calculate the mapping of a file extent onto an object, and fill out the
* request accordingly. shorten extent as necessary if it crosses an
......@@ -336,18 +363,14 @@ static void ceph_osdc_release_request(struct kref *kref)
dout("%s %p (r_request %p r_reply %p)\n", __func__, req,
req->r_request, req->r_reply);
WARN_ON(!RB_EMPTY_NODE(&req->r_node));
WARN_ON(!list_empty(&req->r_req_lru_item));
WARN_ON(!list_empty(&req->r_osd_item));
WARN_ON(!list_empty(&req->r_linger_item));
WARN_ON(!list_empty(&req->r_linger_osd_item));
WARN_ON(req->r_osd);
if (req->r_request)
ceph_msg_put(req->r_request);
if (req->r_reply) {
ceph_msg_revoke_incoming(req->r_reply);
if (req->r_reply)
ceph_msg_put(req->r_reply);
}
for (which = 0; which < req->r_num_ops; which++)
osd_req_op_data_release(req, which);
......@@ -418,8 +441,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
INIT_LIST_HEAD(&req->r_unsafe_item);
INIT_LIST_HEAD(&req->r_linger_item);
INIT_LIST_HEAD(&req->r_linger_osd_item);
INIT_LIST_HEAD(&req->r_req_lru_item);
INIT_LIST_HEAD(&req->r_osd_item);
target_init(&req->r_t);
......@@ -869,141 +890,11 @@ static bool osd_homeless(struct ceph_osd *osd)
return osd->o_osd == CEPH_HOMELESS_OSD;
}
static struct ceph_osd_request *
__lookup_request_ge(struct ceph_osd_client *osdc,
u64 tid)
{
struct ceph_osd_request *req;
struct rb_node *n = osdc->requests.rb_node;
while (n) {
req = rb_entry(n, struct ceph_osd_request, r_node);
if (tid < req->r_tid) {
if (!n->rb_left)
return req;
n = n->rb_left;
} else if (tid > req->r_tid) {
n = n->rb_right;
} else {
return req;
}
}
return NULL;
}
static void __kick_linger_request(struct ceph_osd_request *req)
{
struct ceph_osd_client *osdc = req->r_osdc;
struct ceph_osd *osd = req->r_osd;
/*
* Linger requests need to be resent with a new tid to avoid
* the dup op detection logic on the OSDs. Achieve this with
* a re-register dance instead of open-coding.
*/
ceph_osdc_get_request(req);
if (!list_empty(&req->r_linger_item))
__unregister_linger_request(osdc, req);
else
__unregister_request(osdc, req);
__register_request(osdc, req);
ceph_osdc_put_request(req);
/*
* Unless request has been registered as both normal and
* lingering, __unregister{,_linger}_request clears r_osd.
* However, here we need to preserve r_osd to make sure we
* requeue on the same OSD.
*/
WARN_ON(req->r_osd || !osd);
req->r_osd = osd;
dout("%s requeueing %p tid %llu\n", __func__, req, req->r_tid);
__enqueue_request(req);
}
/*
* Resubmit requests pending on the given osd.
*/
static void __kick_osd_requests(struct ceph_osd_client *osdc,
struct ceph_osd *osd)
{
struct ceph_osd_request *req, *nreq;
LIST_HEAD(resend);
LIST_HEAD(resend_linger);
int err;
dout("%s osd%d\n", __func__, osd->o_osd);
err = __reset_osd(osdc, osd);
if (err)
return;
/*
* Build up a list of requests to resend by traversing the
* osd's list of requests. Requests for a given object are
* sent in tid order, and that is also the order they're
* kept on this list. Therefore all requests that are in
* flight will be found first, followed by all requests that
* have not yet been sent. And to resend requests while
* preserving this order we will want to put any sent
* requests back on the front of the osd client's unsent
* list.
*
* So we build a separate ordered list of already-sent
* requests for the affected osd and splice it onto the
* front of the osd client's unsent list. Once we've seen a
* request that has not yet been sent we're done. Those
* requests are already sitting right where they belong.
*/
list_for_each_entry(req, &osd->o_requests, r_osd_item) {
if (!req->r_sent)
break;
if (!req->r_linger) {
dout("%s requeueing %p tid %llu\n", __func__, req,
req->r_tid);
list_move_tail(&req->r_req_lru_item, &resend);
req->r_flags |= CEPH_OSD_FLAG_RETRY;
} else {
list_move_tail(&req->r_req_lru_item, &resend_linger);
}
}
list_splice(&resend, &osdc->req_unsent);
/*
* Both registered and not yet registered linger requests are
* enqueued with a new tid on the same OSD. We add/move them
* to req_unsent/o_requests at the end to keep things in tid
* order.
*/
list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
r_linger_osd_item) {
WARN_ON(!list_empty(&req->r_req_lru_item));
__kick_linger_request(req);
}
list_for_each_entry_safe(req, nreq, &resend_linger, r_req_lru_item)
__kick_linger_request(req);
}
/*
* If the osd connection drops, we need to resubmit all requests.
*/
static void osd_reset(struct ceph_connection *con)
static bool osd_registered(struct ceph_osd *osd)
{
struct ceph_osd *osd = con->private;
struct ceph_osd_client *osdc;
verify_osdc_locked(osd->o_osdc);
if (!osd)
return;
dout("osd_reset osd%d\n", osd->o_osd);
osdc = osd->o_osdc;
down_read(&osdc->map_sem);
mutex_lock(&osdc->request_mutex);
__kick_osd_requests(osdc, osd);
__send_queued(osdc);
mutex_unlock(&osdc->request_mutex);
up_read(&osdc->map_sem);
return !RB_EMPTY_NODE(&osd->o_node);
}
/*
......@@ -1013,17 +904,18 @@ static void osd_init(struct ceph_osd *osd)
{
atomic_set(&osd->o_ref, 1);
RB_CLEAR_NODE(&osd->o_node);
INIT_LIST_HEAD(&osd->o_requests);
osd->o_requests = RB_ROOT;
INIT_LIST_HEAD(&osd->o_linger_requests);
INIT_LIST_HEAD(&osd->o_osd_lru);
INIT_LIST_HEAD(&osd->o_keepalive_item);
osd->o_incarnation = 1;
mutex_init(&osd->lock);
}
static void osd_cleanup(struct ceph_osd *osd)
{
WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
WARN_ON(!list_empty(&osd->o_requests));
WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
WARN_ON(!list_empty(&osd->o_linger_requests));
WARN_ON(!list_empty(&osd->o_osd_lru));
WARN_ON(!list_empty(&osd->o_keepalive_item));
......@@ -1077,30 +969,6 @@ static void put_osd(struct ceph_osd *osd)
DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node)
/*
* remove an osd from our map
*/
static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
{
dout("%s %p osd%d\n", __func__, osd, osd->o_osd);
WARN_ON(!list_empty(&osd->o_requests));
WARN_ON(!list_empty(&osd->o_linger_requests));
list_del_init(&osd->o_osd_lru);
erase_osd(&osdc->osds, osd);
}
static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
{
dout("%s %p osd%d\n", __func__, osd, osd->o_osd);
if (!RB_EMPTY_NODE(&osd->o_node)) {
ceph_con_close(&osd->o_con);
__remove_osd(osdc, osd);
put_osd(osd);
}
}
static void __move_osd_to_lru(struct ceph_osd *osd)
{
struct ceph_osd_client *osdc = osd->o_osdc;
......@@ -1117,7 +985,7 @@ static void __move_osd_to_lru(struct ceph_osd *osd)
static void maybe_move_osd_to_lru(struct ceph_osd *osd)
{
if (list_empty(&osd->o_requests) &&
if (RB_EMPTY_ROOT(&osd->o_requests) &&
list_empty(&osd->o_linger_requests))
__move_osd_to_lru(osd);
}
......@@ -1134,30 +1002,64 @@ static void __remove_osd_from_lru(struct ceph_osd *osd)
spin_unlock(&osdc->osd_lru_lock);
}
/*
* Close the connection and assign any leftover requests to the
* homeless session.
*/
static void close_osd(struct ceph_osd *osd)
{
struct ceph_osd_client *osdc = osd->o_osdc;
struct rb_node *n;
verify_osdc_wrlocked(osdc);
dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
ceph_con_close(&osd->o_con);
for (n = rb_first(&osd->o_requests); n; ) {
struct ceph_osd_request *req =
rb_entry(n, struct ceph_osd_request, r_node);
n = rb_next(n); /* unlink_request() */
dout(" reassigning req %p tid %llu\n", req, req->r_tid);
unlink_request(osd, req);
link_request(&osdc->homeless_osd, req);
}
__remove_osd_from_lru(osd);
erase_osd(&osdc->osds, osd);
put_osd(osd);
}
/*
* reset osd connect
*/
static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
static int reopen_osd(struct ceph_osd *osd)
{
struct ceph_entity_addr *peer_addr;
dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
if (list_empty(&osd->o_requests) &&
dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
if (RB_EMPTY_ROOT(&osd->o_requests) &&
list_empty(&osd->o_linger_requests)) {
remove_osd(osdc, osd);
close_osd(osd);
return -ENODEV;
}
peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd];
if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
!ceph_con_opened(&osd->o_con)) {
struct ceph_osd_request *req;
struct rb_node *n;
dout("osd addr hasn't changed and connection never opened, "
"letting msgr retry\n");
/* touch each r_stamp for handle_timeout()'s benfit */
list_for_each_entry(req, &osd->o_requests, r_osd_item)
for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
struct ceph_osd_request *req =
rb_entry(n, struct ceph_osd_request, r_node);
req->r_stamp = jiffies;
}
return -EAGAIN;
}
......@@ -1169,73 +1071,84 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
return 0;
}
/*
* Register request, assign tid. If this is the first request, set up
* the timeout event.
*/
static void __register_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
{
req->r_tid = ++osdc->last_tid;
req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
dout("__register_request %p tid %lld\n", req, req->r_tid);
insert_request(&osdc->requests, req);
ceph_osdc_get_request(req);
osdc->num_requests++;
}
/*
* called under osdc->request_mutex
*/
static void __unregister_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
static struct ceph_osd *lookup_create_osd(struct ceph_osd_client *osdc, int o,
bool wrlocked)
{
if (RB_EMPTY_NODE(&req->r_node)) {
dout("__unregister_request %p tid %lld not registered\n",
req, req->r_tid);
return;
}
struct ceph_osd *osd;
dout("__unregister_request %p tid %lld\n", req, req->r_tid);
erase_request(&osdc->requests, req);
osdc->num_requests--;
if (wrlocked)
verify_osdc_wrlocked(osdc);
else
verify_osdc_locked(osdc);
if (req->r_osd) {
/* make sure the original request isn't in flight. */
ceph_msg_revoke(req->r_request);
if (o != CEPH_HOMELESS_OSD)
osd = lookup_osd(&osdc->osds, o);
else
osd = &osdc->homeless_osd;
if (!osd) {
if (!wrlocked)
return ERR_PTR(-EAGAIN);
list_del_init(&req->r_osd_item);
maybe_move_osd_to_lru(req->r_osd);
if (list_empty(&req->r_linger_osd_item))
req->r_osd = NULL;
osd = create_osd(osdc, o);
insert_osd(&osdc->osds, osd);
ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
&osdc->osdmap->osd_addr[osd->o_osd]);
}
list_del_init(&req->r_req_lru_item);
ceph_osdc_put_request(req);
dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd);
return osd;
}
/*
* Cancel a previously queued request message
* Create request <-> OSD session relation.
*
* @req has to be assigned a tid, @osd may be homeless.
*/
static void __cancel_request(struct ceph_osd_request *req)
static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req)
{
if (req->r_sent && req->r_osd) {
ceph_msg_revoke(req->r_request);
req->r_sent = 0;
}
verify_osd_locked(osd);
WARN_ON(!req->r_tid || req->r_osd);
dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
req, req->r_tid);
if (!osd_homeless(osd))
__remove_osd_from_lru(osd);
else
atomic_inc(&osd->o_osdc->num_homeless);
get_osd(osd);
insert_request(&osd->o_requests, req);
req->r_osd = osd;
}
static void __register_linger_request(struct ceph_osd_client *osdc,
static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req)
{
verify_osd_locked(osd);
WARN_ON(req->r_osd != osd);
dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
req, req->r_tid);
req->r_osd = NULL;
erase_request(&osd->o_requests, req);
put_osd(osd);
if (!osd_homeless(osd))
maybe_move_osd_to_lru(osd);
else
atomic_dec(&osd->o_osdc->num_homeless);
}
static void __register_linger_request(struct ceph_osd *osd,
struct ceph_osd_request *req)
{
dout("%s %p tid %llu\n", __func__, req, req->r_tid);
WARN_ON(!req->r_linger);
ceph_osdc_get_request(req);
list_add_tail(&req->r_linger_item, &osdc->req_linger);
if (req->r_osd)
list_add_tail(&req->r_linger_osd_item,
&req->r_osd->o_linger_requests);
list_add_tail(&req->r_linger_item, &osd->o_osdc->req_linger);
list_add_tail(&req->r_linger_osd_item, &osd->o_linger_requests);
__remove_osd_from_lru(osd);
req->r_osd = osd;
}
static void __unregister_linger_request(struct ceph_osd_client *osdc,
......@@ -1255,7 +1168,7 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
if (req->r_osd) {
list_del_init(&req->r_linger_osd_item);
maybe_move_osd_to_lru(req->r_osd);
if (list_empty(&req->r_osd_item))
if (RB_EMPTY_ROOT(&req->r_osd->o_requests))
req->r_osd = NULL;
}
ceph_osdc_put_request(req);
......@@ -1291,11 +1204,20 @@ static bool have_pool_full(struct ceph_osd_client *osdc)
return false;
}
static bool pool_full(struct ceph_osd_client *osdc, s64 pool_id)
{
struct ceph_pg_pool_info *pi;
pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
if (!pi)
return false;
return __pool_full(pi);
}
/*
* Returns whether a request should be blocked from being sent
* based on the current osdmap and osd_client settings.
*
* Caller should hold map_sem for read.
*/
static bool target_should_be_paused(struct ceph_osd_client *osdc,
const struct ceph_osd_request_target *t,
......@@ -1421,87 +1343,6 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
return ct_res;
}
static void __enqueue_request(struct ceph_osd_request *req)
{
struct ceph_osd_client *osdc = req->r_osdc;
dout("%s %p tid %llu to osd%d\n", __func__, req, req->r_tid,
req->r_osd ? req->r_osd->o_osd : -1);
if (req->r_osd) {
__remove_osd_from_lru(req->r_osd);
list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
list_move_tail(&req->r_req_lru_item, &osdc->req_unsent);
} else {
list_move_tail(&req->r_req_lru_item, &osdc->req_notarget);
}
}
/*
* Pick an osd (the first 'up' osd in the pg), allocate the osd struct
* (as needed), and set the request r_osd appropriately. If there is
* no up osd, set r_osd to NULL. Move the request to the appropriate list
* (unsent, homeless) or leave on in-flight lru.
*
* Return 0 if unchanged, 1 if changed, or negative on error.
*
* Caller should hold map_sem for read and request_mutex.
*/
static int __map_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req, int force_resend)
{
enum calc_target_result ct_res;
int err;
dout("map_request %p tid %lld\n", req, req->r_tid);
ct_res = calc_target(osdc, &req->r_t, NULL, force_resend);
switch (ct_res) {
case CALC_TARGET_POOL_DNE:
list_move(&req->r_req_lru_item, &osdc->req_notarget);
return -EIO;
case CALC_TARGET_NO_ACTION:
return 0; /* no change */
default:
BUG_ON(ct_res != CALC_TARGET_NEED_RESEND);
}
dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed, req->r_t.osd,
req->r_osd ? req->r_osd->o_osd : -1);
if (req->r_osd) {
__cancel_request(req);
list_del_init(&req->r_osd_item);
list_del_init(&req->r_linger_osd_item);
req->r_osd = NULL;
}
req->r_osd = lookup_osd(&osdc->osds, req->r_t.osd);
if (!req->r_osd && req->r_t.osd >= 0) {
err = -ENOMEM;
req->r_osd = create_osd(osdc, req->r_t.osd);
if (!req->r_osd) {
list_move(&req->r_req_lru_item, &osdc->req_notarget);
goto out;
}
dout("map_request osd %p is osd%d\n", req->r_osd,
req->r_osd->o_osd);
insert_osd(&osdc->osds, req->r_osd);
ceph_con_open(&req->r_osd->o_con,
CEPH_ENTITY_TYPE_OSD, req->r_osd->o_osd,
&osdc->osdmap->osd_addr[req->r_osd->o_osd]);
}
__enqueue_request(req);
err = 1; /* osd or pg changed */
out:
return err;
}
static void setup_request_data(struct ceph_osd_request *req,
struct ceph_msg *msg)
{
......@@ -1648,8 +1489,16 @@ static void send_request(struct ceph_osd_request *req)
{
struct ceph_osd *osd = req->r_osd;
verify_osd_locked(osd);
WARN_ON(osd->o_osd != req->r_t.osd);
/*
* We may have a previously queued request message hanging
* around. Cancel it to avoid corrupting the msgr.
*/
if (req->r_sent)
ceph_msg_revoke(req->r_request);
req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
if (req->r_attempts)
req->r_flags |= CEPH_OSD_FLAG_RETRY;
......@@ -1671,24 +1520,11 @@ static void send_request(struct ceph_osd_request *req)
ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request));
}
/*
* Send any requests in the queue (req_unsent).
*/
static void __send_queued(struct ceph_osd_client *osdc)
{
struct ceph_osd_request *req, *tmp;
dout("__send_queued\n");
list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
send_request(req);
}
}
static void maybe_request_map(struct ceph_osd_client *osdc)
{
bool continuous = false;
verify_osdc_locked(osdc);
WARN_ON(!osdc->osdmap->epoch);
if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
......@@ -1705,38 +1541,121 @@ static void maybe_request_map(struct ceph_osd_client *osdc)
ceph_monc_renew_subs(&osdc->client->monc);
}
/*
* Caller should hold map_sem for read and request_mutex.
*/
static int __ceph_osdc_start_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req,
bool nofail)
static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
{
int rc;
struct ceph_osd_client *osdc = req->r_osdc;
struct ceph_osd *osd;
bool need_send = false;
bool promoted = false;
__register_request(osdc, req);
req->r_sent = 0;
req->r_got_reply = 0;
rc = __map_request(osdc, req, 0);
if (rc < 0) {
if (nofail) {
dout("osdc_start_request failed map, "
" will retry %lld\n", req->r_tid);
rc = 0;
} else {
__unregister_request(osdc, req);
}
return rc;
WARN_ON(req->r_tid || req->r_got_reply);
dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
again:
calc_target(osdc, &req->r_t, &req->r_last_force_resend, false);
osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked);
if (IS_ERR(osd)) {
WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked);
goto promote;
}
if (req->r_osd == NULL) {
dout("send_request %p no up osds in pg\n", req);
ceph_monc_request_next_osdmap(&osdc->client->monc);
if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR)) {
dout("req %p pausewr\n", req);
req->r_t.paused = true;
maybe_request_map(osdc);
} else if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD)) {
dout("req %p pauserd\n", req);
req->r_t.paused = true;
maybe_request_map(osdc);
} else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
!(req->r_flags & (CEPH_OSD_FLAG_FULL_TRY |
CEPH_OSD_FLAG_FULL_FORCE)) &&
(ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
pool_full(osdc, req->r_t.base_oloc.pool))) {
dout("req %p full/pool_full\n", req);
pr_warn_ratelimited("FULL or reached pool quota\n");
req->r_t.paused = true;
maybe_request_map(osdc);
} else if (!osd_homeless(osd)) {
need_send = true;
} else {
__send_queued(osdc);
maybe_request_map(osdc);
}
return 0;
mutex_lock(&osd->lock);
/*
* Assign the tid atomically with send_request() to protect
* multiple writes to the same object from racing with each
* other, resulting in out of order ops on the OSDs.
*/
req->r_tid = atomic64_inc_return(&osdc->last_tid);
link_request(osd, req);
if (need_send)
send_request(req);
mutex_unlock(&osd->lock);
if (promoted)
downgrade_write(&osdc->lock);
return;
promote:
up_read(&osdc->lock);
down_write(&osdc->lock);
wrlocked = true;
promoted = true;
goto again;
}
static void account_request(struct ceph_osd_request *req)
{
unsigned int mask = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
if (req->r_flags & CEPH_OSD_FLAG_READ) {
WARN_ON(req->r_flags & mask);
req->r_flags |= CEPH_OSD_FLAG_ACK;
} else if (req->r_flags & CEPH_OSD_FLAG_WRITE)
WARN_ON(!(req->r_flags & mask));
else
WARN_ON(1);
WARN_ON(req->r_unsafe_callback && (req->r_flags & mask) != mask);
atomic_inc(&req->r_osdc->num_requests);
}
static void submit_request(struct ceph_osd_request *req, bool wrlocked)
{
ceph_osdc_get_request(req);
account_request(req);
__submit_request(req, wrlocked);
}
static void __finish_request(struct ceph_osd_request *req)
{
struct ceph_osd_client *osdc = req->r_osdc;
struct ceph_osd *osd = req->r_osd;
verify_osd_locked(osd);
dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
unlink_request(osd, req);
atomic_dec(&osdc->num_requests);
/*
* If an OSD has failed or returned and a request has been sent
* twice, it's possible to get a reply and end up here while the
* request message is queued for delivery. We will ignore the
* reply, so not a big deal, but better to try and catch it.
*/
ceph_msg_revoke(req->r_request);
ceph_msg_revoke_incoming(req->r_reply);
}
static void finish_request(struct ceph_osd_request *req)
{
__finish_request(req);
ceph_osdc_put_request(req);
}
static void __complete_request(struct ceph_osd_request *req)
......@@ -1747,6 +1666,13 @@ static void __complete_request(struct ceph_osd_request *req)
complete_all(&req->r_completion);
}
static void cancel_request(struct ceph_osd_request *req)
{
dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
finish_request(req);
}
/*
* Timeout callback, called every N seconds. When 1 or more OSD
* requests has been active for more than N seconds, we send a keepalive
......@@ -1758,44 +1684,49 @@ static void handle_timeout(struct work_struct *work)
struct ceph_osd_client *osdc =
container_of(work, struct ceph_osd_client, timeout_work.work);
struct ceph_options *opts = osdc->client->options;
struct ceph_osd_request *req;
struct ceph_osd *osd;
struct list_head slow_osds;
dout("timeout\n");
down_read(&osdc->map_sem);
ceph_monc_request_next_osdmap(&osdc->client->monc);
unsigned long cutoff = jiffies - opts->osd_keepalive_timeout;
LIST_HEAD(slow_osds);
struct rb_node *n, *p;
mutex_lock(&osdc->request_mutex);
dout("%s osdc %p\n", __func__, osdc);
down_write(&osdc->lock);
/*
* ping osds that are a bit slow. this ensures that if there
* is a break in the TCP connection we will notice, and reopen
* a connection with that osd (from the fault callback).
*/
INIT_LIST_HEAD(&slow_osds);
list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
if (time_before(jiffies,
req->r_stamp + opts->osd_keepalive_timeout))
break;
for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
bool found = false;
for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) {
struct ceph_osd_request *req =
rb_entry(p, struct ceph_osd_request, r_node);
if (time_before(req->r_stamp, cutoff)) {
dout(" req %p tid %llu on osd%d is laggy\n",
req, req->r_tid, osd->o_osd);
found = true;
}
}
osd = req->r_osd;
BUG_ON(!osd);
dout(" tid %llu is slow, will send keepalive on osd%d\n",
req->r_tid, osd->o_osd);
list_move_tail(&osd->o_keepalive_item, &slow_osds);
if (found)
list_move_tail(&osd->o_keepalive_item, &slow_osds);
}
if (atomic_read(&osdc->num_homeless) || !list_empty(&slow_osds))
maybe_request_map(osdc);
while (!list_empty(&slow_osds)) {
osd = list_entry(slow_osds.next, struct ceph_osd,
o_keepalive_item);
struct ceph_osd *osd = list_first_entry(&slow_osds,
struct ceph_osd,
o_keepalive_item);
list_del_init(&osd->o_keepalive_item);
ceph_con_keepalive(&osd->o_con);
}
__send_queued(osdc);
mutex_unlock(&osdc->request_mutex);
up_read(&osdc->map_sem);
up_write(&osdc->lock);
schedule_delayed_work(&osdc->timeout_work,
osdc->client->options->osd_keepalive_timeout);
}
......@@ -1809,18 +1740,17 @@ static void handle_osds_timeout(struct work_struct *work)
struct ceph_osd *osd, *nosd;
dout("%s osdc %p\n", __func__, osdc);
down_read(&osdc->map_sem);
mutex_lock(&osdc->request_mutex);
down_write(&osdc->lock);
list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
if (time_before(jiffies, osd->lru_ttl))
break;
remove_osd(osdc, osd);
WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
WARN_ON(!list_empty(&osd->o_linger_requests));
close_osd(osd);
}
mutex_unlock(&osdc->request_mutex);
up_read(&osdc->map_sem);
up_write(&osdc->lock);
schedule_delayed_work(&osdc->osds_timeout_work,
round_jiffies_relative(delay));
}
......@@ -2045,8 +1975,9 @@ static bool done_request(const struct ceph_osd_request *req,
* when we get the safe reply r_unsafe_cb(false), r_cb/r_completion,
* r_safe_completion r_safe_completion
*/
static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
{
struct ceph_osd_client *osdc = osd->o_osdc;
struct ceph_osd_request *req;
struct MOSDOpReply m;
u64 tid = le64_to_cpu(msg->hdr.tid);
......@@ -2057,14 +1988,19 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
dout("%s msg %p tid %llu\n", __func__, msg, tid);
down_read(&osdc->map_sem);
mutex_lock(&osdc->request_mutex);
req = lookup_request(&osdc->requests, tid);
down_read(&osdc->lock);
if (!osd_registered(osd)) {
dout("%s osd%d unknown\n", __func__, osd->o_osd);
goto out_unlock_osdc;
}
WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
mutex_lock(&osd->lock);
req = lookup_request(&osd->o_requests, tid);
if (!req) {
dout("%s no tid %llu\n", __func__, tid);
goto out_unlock;
dout("%s osd%d tid %llu unknown\n", __func__, osd->o_osd, tid);
goto out_unlock_session;
}
ceph_osdc_get_request(req);
ret = decode_MOSDOpReply(msg, &m);
if (ret) {
......@@ -2083,7 +2019,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
dout("req %p tid %llu retry_attempt %d != %d, ignoring\n",
req, req->r_tid, m.retry_attempt,
req->r_attempts - 1);
goto out_put;
goto out_unlock_session;
}
} else {
WARN_ON(1); /* MOSDOpReply v4 is assumed */
......@@ -2092,22 +2028,14 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
if (!ceph_oloc_empty(&m.redirect.oloc)) {
dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid,
m.redirect.oloc.pool);
__unregister_request(osdc, req);
unlink_request(osd, req);
mutex_unlock(&osd->lock);
ceph_oloc_copy(&req->r_t.target_oloc, &m.redirect.oloc);
/*
* Start redirect requests with nofail=true. If
* mapping fails, request will end up on the notarget
* list, waiting for the new osdmap (which can take
* a while), even though the original request mapped
* successfully. In the future we might want to follow
* original request's nofail setting here.
*/
ret = __ceph_osdc_start_request(osdc, req, true);
BUG_ON(ret);
goto out_put;
req->r_flags |= CEPH_OSD_FLAG_REDIRECTED;
req->r_tid = 0;
__submit_request(req, false);
goto out_unlock_osdc;
}
if (m.num_ops != req->r_num_ops) {
......@@ -2137,19 +2065,19 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
req->r_got_reply = true;
} else if (!(m.flags & CEPH_OSD_FLAG_ONDISK)) {
dout("req %p tid %llu dup ack\n", req, req->r_tid);
goto out_put;
goto out_unlock_session;
}
if (done_request(req, &m)) {
__unregister_request(osdc, req);
__finish_request(req);
if (req->r_linger) {
WARN_ON(req->r_unsafe_callback);
__register_linger_request(osdc, req);
__register_linger_request(osd, req);
}
}
mutex_unlock(&osdc->request_mutex);
up_read(&osdc->map_sem);
mutex_unlock(&osd->lock);
up_read(&osdc->lock);
if (done_request(req, &m)) {
if (already_acked && req->r_unsafe_callback) {
......@@ -2175,14 +2103,13 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
fail_request:
req->r_result = -EIO;
__unregister_request(osdc, req);
__finish_request(req);
__complete_request(req);
complete_all(&req->r_safe_completion);
out_put:
ceph_osdc_put_request(req);
out_unlock:
mutex_unlock(&osdc->request_mutex);
up_read(&osdc->map_sem);
out_unlock_session:
mutex_unlock(&osd->lock);
out_unlock_osdc:
up_read(&osdc->lock);
}
static void set_pool_was_full(struct ceph_osd_client *osdc)
......@@ -2197,126 +2124,66 @@ static void set_pool_was_full(struct ceph_osd_client *osdc)
}
}
static void reset_changed_osds(struct ceph_osd_client *osdc)
static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id)
{
struct rb_node *p, *n;
struct ceph_pg_pool_info *pi;
dout("%s %p\n", __func__, osdc);
for (p = rb_first(&osdc->osds); p; p = n) {
struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
if (!pi)
return false;
n = rb_next(p);
if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
memcmp(&osd->o_con.peer_addr,
ceph_osd_addr(osdc->osdmap,
osd->o_osd),
sizeof(struct ceph_entity_addr)) != 0)
__reset_osd(osdc, osd);
}
return pi->was_full && !__pool_full(pi);
}
/*
* Requeue requests whose mapping to an OSD has changed. If requests map to
* no osd, request a new map.
*
* Caller should hold map_sem for read.
* Requeue requests whose mapping to an OSD has changed.
*/
static void kick_requests(struct ceph_osd_client *osdc, bool force_resend,
bool force_resend_writes)
static void scan_requests(struct ceph_osd *osd,
bool force_resend,
bool cleared_full,
bool check_pool_cleared_full,
struct rb_root *need_resend,
struct list_head *need_resend_linger)
{
struct ceph_osd_request *req, *nreq;
struct rb_node *p;
int needmap = 0;
int err;
bool force_resend_req;
dout("kick_requests %s %s\n", force_resend ? " (force resend)" : "",
force_resend_writes ? " (force resend writes)" : "");
mutex_lock(&osdc->request_mutex);
for (p = rb_first(&osdc->requests); p; ) {
req = rb_entry(p, struct ceph_osd_request, r_node);
p = rb_next(p);
/*
* For linger requests that have not yet been
* registered, move them to the linger list; they'll
* be sent to the osd in the loop below. Unregister
* the request before re-registering it as a linger
* request to ensure the __map_request() below
* will decide it needs to be sent.
*/
if (req->r_linger && list_empty(&req->r_linger_item)) {
dout("%p tid %llu restart on osd%d\n",
req, req->r_tid,
req->r_osd ? req->r_osd->o_osd : -1);
ceph_osdc_get_request(req);
__unregister_request(osdc, req);
__register_linger_request(osdc, req);
ceph_osdc_put_request(req);
continue;
}
force_resend_req = force_resend ||
(force_resend_writes &&
req->r_flags & CEPH_OSD_FLAG_WRITE);
err = __map_request(osdc, req, force_resend_req);
if (err < 0)
continue; /* error */
if (req->r_osd == NULL) {
dout("%p tid %llu maps to no osd\n", req, req->r_tid);
needmap++; /* request a newer map */
} else if (err > 0) {
if (!req->r_linger) {
dout("%p tid %llu requeued on osd%d\n", req,
req->r_tid,
req->r_osd ? req->r_osd->o_osd : -1);
req->r_flags |= CEPH_OSD_FLAG_RETRY;
}
}
}
list_for_each_entry_safe(req, nreq, &osdc->req_linger,
r_linger_item) {
dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
err = __map_request(osdc, req,
force_resend || force_resend_writes);
dout("__map_request returned %d\n", err);
if (err < 0)
continue; /* hrm! */
if (req->r_osd == NULL || err > 0) {
if (req->r_osd == NULL) {
dout("lingering %p tid %llu maps to no osd\n",
req, req->r_tid);
/*
* A homeless lingering request makes
* no sense, as it's job is to keep
* a particular OSD connection open.
* Request a newer map and kick the
* request, knowing that it won't be
* resent until we actually get a map
* that can tell us where to send it.
*/
needmap++;
}
dout("kicking lingering %p tid %llu osd%d\n", req,
req->r_tid, req->r_osd ? req->r_osd->o_osd : -1);
__register_request(osdc, req);
__unregister_linger_request(osdc, req);
struct ceph_osd_client *osdc = osd->o_osdc;
struct rb_node *n;
bool force_resend_writes;
for (n = rb_first(&osd->o_requests); n; ) {
struct ceph_osd_request *req =
rb_entry(n, struct ceph_osd_request, r_node);
enum calc_target_result ct_res;
n = rb_next(n); /* unlink_request() */
dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
ct_res = calc_target(osdc, &req->r_t,
&req->r_last_force_resend, false);
switch (ct_res) {
case CALC_TARGET_NO_ACTION:
force_resend_writes = cleared_full ||
(check_pool_cleared_full &&
pool_cleared_full(osdc, req->r_t.base_oloc.pool));
if (!force_resend &&
(!(req->r_flags & CEPH_OSD_FLAG_WRITE) ||
!force_resend_writes))
break;
/* fall through */
case CALC_TARGET_NEED_RESEND:
unlink_request(osd, req);
insert_request(need_resend, req);
break;
case CALC_TARGET_POOL_DNE:
break;
}
}
reset_changed_osds(osdc);
mutex_unlock(&osdc->request_mutex);
if (needmap) {
dout("%d requests for down osds, need new map\n", needmap);
ceph_monc_request_next_osdmap(&osdc->client->monc);
}
}
static int handle_one_map(struct ceph_osd_client *osdc,
void *p, void *end, bool incremental)
void *p, void *end, bool incremental,
struct rb_root *need_resend,
struct list_head *need_resend_linger)
{
struct ceph_osdmap *newmap;
struct rb_node *n;
......@@ -2362,11 +2229,51 @@ static int handle_one_map(struct ceph_osd_client *osdc,
}
was_full &= !ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
kick_requests(osdc, skipped_map, was_full);
scan_requests(&osdc->homeless_osd, skipped_map, was_full, true,
need_resend, need_resend_linger);
for (n = rb_first(&osdc->osds); n; ) {
struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
n = rb_next(n); /* close_osd() */
scan_requests(osd, skipped_map, was_full, true, need_resend,
need_resend_linger);
if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
memcmp(&osd->o_con.peer_addr,
ceph_osd_addr(osdc->osdmap, osd->o_osd),
sizeof(struct ceph_entity_addr)))
close_osd(osd);
}
return 0;
}
static void kick_requests(struct ceph_osd_client *osdc,
struct rb_root *need_resend,
struct list_head *need_resend_linger)
{
struct rb_node *n;
for (n = rb_first(need_resend); n; ) {
struct ceph_osd_request *req =
rb_entry(n, struct ceph_osd_request, r_node);
struct ceph_osd *osd;
n = rb_next(n);
erase_request(need_resend, req); /* before link_request() */
WARN_ON(req->r_osd);
calc_target(osdc, &req->r_t, NULL, false);
osd = lookup_create_osd(osdc, req->r_t.osd, true);
link_request(osd, req);
if (!req->r_linger) {
if (!osd_homeless(osd) && !req->r_t.paused)
send_request(req);
}
}
}
/*
* Process updated osd map.
*
......@@ -2381,13 +2288,15 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
u32 nr_maps, maplen;
u32 epoch;
struct ceph_fsid fsid;
struct rb_root need_resend = RB_ROOT;
LIST_HEAD(need_resend_linger);
bool handled_incremental = false;
bool was_pauserd, was_pausewr;
bool pauserd, pausewr;
int err;
dout("%s have %u\n", __func__, osdc->osdmap->epoch);
down_write(&osdc->map_sem);
down_write(&osdc->lock);
/* verify fsid */
ceph_decode_need(&p, end, sizeof(fsid), bad);
......@@ -2412,7 +2321,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
osdc->osdmap->epoch + 1 == epoch) {
dout("applying incremental map %u len %d\n",
epoch, maplen);
err = handle_one_map(osdc, p, p + maplen, true);
err = handle_one_map(osdc, p, p + maplen, true,
&need_resend, &need_resend_linger);
if (err)
goto bad;
handled_incremental = true;
......@@ -2443,7 +2353,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
osdc->osdmap->epoch);
} else {
dout("taking full map %u len %d\n", epoch, maplen);
err = handle_one_map(osdc, p, p + maplen, false);
err = handle_one_map(osdc, p, p + maplen, false,
&need_resend, &need_resend_linger);
if (err)
goto bad;
}
......@@ -2464,20 +2375,60 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
if (was_pauserd || was_pausewr || pauserd || pausewr)
maybe_request_map(osdc);
mutex_lock(&osdc->request_mutex);
__send_queued(osdc);
mutex_unlock(&osdc->request_mutex);
kick_requests(osdc, &need_resend, &need_resend_linger);
ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
osdc->osdmap->epoch);
up_write(&osdc->map_sem);
up_write(&osdc->lock);
wake_up_all(&osdc->client->auth_wq);
return;
bad:
pr_err("osdc handle_map corrupt msg\n");
ceph_msg_dump(msg);
up_write(&osdc->map_sem);
up_write(&osdc->lock);
}
/*
* Resubmit requests pending on the given osd.
*/
static void kick_osd_requests(struct ceph_osd *osd)
{
struct rb_node *n;
for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
struct ceph_osd_request *req =
rb_entry(n, struct ceph_osd_request, r_node);
if (!req->r_linger) {
if (!req->r_t.paused)
send_request(req);
}
}
}
/*
* If the osd connection drops, we need to resubmit all requests.
*/
static void osd_fault(struct ceph_connection *con)
{
struct ceph_osd *osd = con->private;
struct ceph_osd_client *osdc = osd->o_osdc;
dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
down_write(&osdc->lock);
if (!osd_registered(osd)) {
dout("%s osd%d unknown\n", __func__, osd->o_osd);
goto out_unlock;
}
if (!reopen_osd(osd))
kick_osd_requests(osd);
maybe_request_map(osdc);
out_unlock:
up_write(&osdc->lock);
}
/*
......@@ -2680,17 +2631,11 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req,
bool nofail)
{
int rc;
down_read(&osdc->map_sem);
mutex_lock(&osdc->request_mutex);
rc = __ceph_osdc_start_request(osdc, req, nofail);
down_read(&osdc->lock);
submit_request(req, false);
up_read(&osdc->lock);
mutex_unlock(&osdc->request_mutex);
up_read(&osdc->map_sem);
return rc;
return 0;
}
EXPORT_SYMBOL(ceph_osdc_start_request);
......@@ -2703,13 +2648,12 @@ void ceph_osdc_cancel_request(struct ceph_osd_request *req)
{
struct ceph_osd_client *osdc = req->r_osdc;
mutex_lock(&osdc->request_mutex);
down_write(&osdc->lock);
if (req->r_linger)
__unregister_linger_request(osdc, req);
__unregister_request(osdc, req);
mutex_unlock(&osdc->request_mutex);
dout("%s %p tid %llu canceled\n", __func__, req, req->r_tid);
if (req->r_osd)
cancel_request(req);
up_write(&osdc->lock);
}
EXPORT_SYMBOL(ceph_osdc_cancel_request);
......@@ -2744,32 +2688,40 @@ EXPORT_SYMBOL(ceph_osdc_wait_request);
*/
void ceph_osdc_sync(struct ceph_osd_client *osdc)
{
struct ceph_osd_request *req;
u64 last_tid, next_tid = 0;
struct rb_node *n, *p;
u64 last_tid = atomic64_read(&osdc->last_tid);
mutex_lock(&osdc->request_mutex);
last_tid = osdc->last_tid;
while (1) {
req = __lookup_request_ge(osdc, next_tid);
if (!req)
break;
if (req->r_tid > last_tid)
break;
again:
down_read(&osdc->lock);
for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
mutex_lock(&osd->lock);
for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) {
struct ceph_osd_request *req =
rb_entry(p, struct ceph_osd_request, r_node);
if (req->r_tid > last_tid)
break;
if (!(req->r_flags & CEPH_OSD_FLAG_WRITE))
continue;
next_tid = req->r_tid + 1;
if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
continue;
ceph_osdc_get_request(req);
mutex_unlock(&osd->lock);
up_read(&osdc->lock);
dout("%s waiting on req %p tid %llu last_tid %llu\n",
__func__, req, req->r_tid, last_tid);
wait_for_completion(&req->r_safe_completion);
ceph_osdc_put_request(req);
goto again;
}
ceph_osdc_get_request(req);
mutex_unlock(&osdc->request_mutex);
dout("sync waiting on tid %llu (last is %llu)\n",
req->r_tid, last_tid);
wait_for_completion(&req->r_safe_completion);
mutex_lock(&osdc->request_mutex);
ceph_osdc_put_request(req);
mutex_unlock(&osd->lock);
}
mutex_unlock(&osdc->request_mutex);
dout("sync done (thru tid %llu)\n", last_tid);
up_read(&osdc->lock);
dout("%s done last_tid %llu\n", __func__, last_tid);
}
EXPORT_SYMBOL(ceph_osdc_sync);
......@@ -2793,18 +2745,14 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
dout("init\n");
osdc->client = client;
init_rwsem(&osdc->map_sem);
mutex_init(&osdc->request_mutex);
osdc->last_tid = 0;
init_rwsem(&osdc->lock);
osdc->osds = RB_ROOT;
INIT_LIST_HEAD(&osdc->osd_lru);
spin_lock_init(&osdc->osd_lru_lock);
osdc->requests = RB_ROOT;
INIT_LIST_HEAD(&osdc->req_lru);
INIT_LIST_HEAD(&osdc->req_unsent);
INIT_LIST_HEAD(&osdc->req_notarget);
INIT_LIST_HEAD(&osdc->req_linger);
osdc->num_requests = 0;
osd_init(&osdc->homeless_osd);
osdc->homeless_osd.o_osdc = osdc;
osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD;
INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
spin_lock_init(&osdc->event_lock);
......@@ -2861,13 +2809,19 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
cancel_delayed_work_sync(&osdc->timeout_work);
cancel_delayed_work_sync(&osdc->osds_timeout_work);
mutex_lock(&osdc->request_mutex);
down_write(&osdc->lock);
while (!RB_EMPTY_ROOT(&osdc->osds)) {
struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
struct ceph_osd, o_node);
remove_osd(osdc, osd);
close_osd(osd);
}
mutex_unlock(&osdc->request_mutex);
up_write(&osdc->lock);
WARN_ON(atomic_read(&osdc->homeless_osd.o_ref) != 1);
osd_cleanup(&osdc->homeless_osd);
WARN_ON(!list_empty(&osdc->osd_lru));
WARN_ON(atomic_read(&osdc->num_requests));
WARN_ON(atomic_read(&osdc->num_homeless));
ceph_osdmap_destroy(osdc->osdmap);
mempool_destroy(osdc->req_mempool);
......@@ -2982,19 +2936,15 @@ EXPORT_SYMBOL(ceph_osdc_cleanup);
static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
{
struct ceph_osd *osd = con->private;
struct ceph_osd_client *osdc;
struct ceph_osd_client *osdc = osd->o_osdc;
int type = le16_to_cpu(msg->hdr.type);
if (!osd)
goto out;
osdc = osd->o_osdc;
switch (type) {
case CEPH_MSG_OSD_MAP:
ceph_osdc_handle_map(osdc, msg);
break;
case CEPH_MSG_OSD_OPREPLY:
handle_reply(osdc, msg);
handle_reply(osd, msg);
break;
case CEPH_MSG_WATCH_NOTIFY:
handle_watch_notify(osdc, msg);
......@@ -3004,7 +2954,7 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
pr_err("received unknown message type %d %s\n", type,
ceph_msg_type_name(type));
}
out:
ceph_msg_put(msg);
}
......@@ -3019,21 +2969,27 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
{
struct ceph_osd *osd = con->private;
struct ceph_osd_client *osdc = osd->o_osdc;
struct ceph_msg *m;
struct ceph_msg *m = NULL;
struct ceph_osd_request *req;
int front_len = le32_to_cpu(hdr->front_len);
int data_len = le32_to_cpu(hdr->data_len);
u64 tid;
u64 tid = le64_to_cpu(hdr->tid);
tid = le64_to_cpu(hdr->tid);
mutex_lock(&osdc->request_mutex);
req = lookup_request(&osdc->requests, tid);
down_read(&osdc->lock);
if (!osd_registered(osd)) {
dout("%s osd%d unknown, skipping\n", __func__, osd->o_osd);
*skip = 1;
goto out_unlock_osdc;
}
WARN_ON(osd->o_osd != le64_to_cpu(hdr->src.num));
mutex_lock(&osd->lock);
req = lookup_request(&osd->o_requests, tid);
if (!req) {
dout("%s osd%d tid %llu unknown, skipping\n", __func__,
osd->o_osd, tid);
m = NULL;
*skip = 1;
goto out;
goto out_unlock_session;
}
ceph_msg_revoke_incoming(req->r_reply);
......@@ -3045,7 +3001,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
false);
if (!m)
goto out;
goto out_unlock_session;
ceph_msg_put(req->r_reply);
req->r_reply = m;
}
......@@ -3056,14 +3012,16 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
req->r_reply->data_length);
m = NULL;
*skip = 1;
goto out;
goto out_unlock_session;
}
m = ceph_msg_get(req->r_reply);
dout("get_reply tid %lld %p\n", tid, m);
out:
mutex_unlock(&osdc->request_mutex);
out_unlock_session:
mutex_unlock(&osd->lock);
out_unlock_osdc:
up_read(&osdc->lock);
return m;
}
......@@ -3083,8 +3041,8 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
case CEPH_MSG_OSD_OPREPLY:
return get_reply(con, hdr, skip);
default:
pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
osd->o_osd);
pr_warn("%s osd%d unknown msg type %d, skipping\n", __func__,
osd->o_osd, type);
*skip = 1;
return NULL;
}
......@@ -3188,5 +3146,5 @@ static const struct ceph_connection_operations osd_con_ops = {
.alloc_msg = alloc_msg,
.sign_message = osd_sign_message,
.check_message_signature = osd_check_message_signature,
.fault = osd_reset,
.fault = osd_fault,
};
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment