Commit 922dab61 authored by Ilya Dryomov's avatar Ilya Dryomov

libceph, rbd: ceph_osd_linger_request, watch/notify v2

This adds support and switches rbd to a new, more reliable version of
watch/notify protocol.  As with the OSD client update, this is mostly
about getting the right structures linked into the right places so that
reconnects are properly sent when needed.  watch/notify v2 also
requires sending regular pings to the OSDs - send_linger_ping().

A major change from the old watch/notify implementation is the
introduction of ceph_osd_linger_request - linger requests no longer
piggy back on ceph_osd_request.  ceph_osd_event has been merged into
ceph_osd_linger_request.

All the details are now hidden within libceph, the interface consists
of a simple pair of watch/unwatch functions and ceph_osdc_notify_ack().
ceph_osdc_watch() does return ceph_osd_linger_request, but only to keep
the lifetime management simple.

ceph_osdc_notify_ack() accepts an optional data payload, which is
relayed back to the notifier.

Portions of this patch are loosely based on work by Douglas Fuller
<dfuller@redhat.com> and Mike Christie <michaelc@cs.wisc.edu>.
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent c525f036
......@@ -351,11 +351,11 @@ struct rbd_device {
struct rbd_options *opts;
struct ceph_object_id header_oid;
struct ceph_object_locator header_oloc;
struct ceph_file_layout layout;
struct ceph_osd_event *watch_event;
struct rbd_obj_request *watch_request;
struct ceph_osd_linger_request *watch_handle;
struct rbd_spec *parent_spec;
u64 parent_overlap;
......@@ -1596,12 +1596,6 @@ static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
return __rbd_obj_request_wait(obj_request, 0);
}
static int rbd_obj_request_wait_timeout(struct rbd_obj_request *obj_request,
unsigned long timeout)
{
return __rbd_obj_request_wait(obj_request, timeout);
}
static void rbd_img_request_complete(struct rbd_img_request *img_request)
{
......@@ -1751,12 +1745,6 @@ static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
complete_all(&obj_request->completion);
}
static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
{
dout("%s: obj %p\n", __func__, obj_request);
obj_request_done_set(obj_request);
}
static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
{
struct rbd_img_request *img_request = NULL;
......@@ -1877,10 +1865,6 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
case CEPH_OSD_OP_CALL:
rbd_osd_call_callback(obj_request);
break;
case CEPH_OSD_OP_NOTIFY_ACK:
case CEPH_OSD_OP_WATCH:
rbd_osd_trivial_callback(obj_request);
break;
default:
rbd_warn(NULL, "%s: unsupported op %hu",
obj_request->object_name, (unsigned short) opcode);
......@@ -3100,45 +3084,18 @@ static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
obj_request_done_set(obj_request);
}
static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
{
struct rbd_obj_request *obj_request;
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
int ret;
obj_request = rbd_obj_request_create(rbd_dev->header_oid.name, 0, 0,
OBJ_REQUEST_NODATA);
if (!obj_request)
return -ENOMEM;
static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev);
static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev);
ret = -ENOMEM;
obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
obj_request);
if (!obj_request->osd_req)
goto out;
osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
notify_id, 0, 0);
rbd_osd_req_format_read(obj_request);
ret = rbd_obj_request_submit(osdc, obj_request);
if (ret)
goto out;
ret = rbd_obj_request_wait(obj_request);
out:
rbd_obj_request_put(obj_request);
return ret;
}
static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
u64 notifier_id, void *data, size_t data_len)
{
struct rbd_device *rbd_dev = (struct rbd_device *)data;
struct rbd_device *rbd_dev = arg;
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
int ret;
dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
rbd_dev->header_oid.name, (unsigned long long)notify_id,
(unsigned int)opcode);
dout("%s rbd_dev %p cookie %llu notify_id %llu\n", __func__, rbd_dev,
cookie, notify_id);
/*
* Until adequate refresh error handling is in place, there is
......@@ -3150,63 +3107,31 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
if (ret)
rbd_warn(rbd_dev, "refresh failed: %d", ret);
ret = rbd_obj_notify_ack_sync(rbd_dev, notify_id);
ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
&rbd_dev->header_oloc, notify_id, cookie,
NULL, 0);
if (ret)
rbd_warn(rbd_dev, "notify_ack ret %d", ret);
}
/*
* Send a (un)watch request and wait for the ack. Return a request
* with a ref held on success or error.
*/
static struct rbd_obj_request *rbd_obj_watch_request_helper(
struct rbd_device *rbd_dev,
bool watch)
static void rbd_watch_errcb(void *arg, u64 cookie, int err)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct ceph_options *opts = osdc->client->options;
struct rbd_obj_request *obj_request;
struct rbd_device *rbd_dev = arg;
int ret;
obj_request = rbd_obj_request_create(rbd_dev->header_oid.name, 0, 0,
OBJ_REQUEST_NODATA);
if (!obj_request)
return ERR_PTR(-ENOMEM);
obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_WRITE, 1,
obj_request);
if (!obj_request->osd_req) {
ret = -ENOMEM;
goto out;
}
osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
rbd_dev->watch_event->cookie, 0, watch);
rbd_osd_req_format_write(obj_request);
if (watch)
ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
ret = rbd_obj_request_submit(osdc, obj_request);
if (ret)
goto out;
rbd_warn(rbd_dev, "encountered watch error: %d", err);
ret = rbd_obj_request_wait_timeout(obj_request, opts->mount_timeout);
if (ret)
goto out;
__rbd_dev_header_unwatch_sync(rbd_dev);
ret = obj_request->result;
ret = rbd_dev_header_watch_sync(rbd_dev);
if (ret) {
if (watch)
rbd_obj_request_end(obj_request);
goto out;
rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
return;
}
return obj_request;
out:
rbd_obj_request_put(obj_request);
return ERR_PTR(ret);
ret = rbd_dev_refresh(rbd_dev);
if (ret)
rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
}
/*
......@@ -3215,57 +3140,33 @@ static struct rbd_obj_request *rbd_obj_watch_request_helper(
static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct rbd_obj_request *obj_request;
int ret;
struct ceph_osd_linger_request *handle;
rbd_assert(!rbd_dev->watch_event);
rbd_assert(!rbd_dev->watch_request);
ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
&rbd_dev->watch_event);
if (ret < 0)
return ret;
obj_request = rbd_obj_watch_request_helper(rbd_dev, true);
if (IS_ERR(obj_request)) {
ceph_osdc_cancel_event(rbd_dev->watch_event);
rbd_dev->watch_event = NULL;
return PTR_ERR(obj_request);
}
rbd_assert(!rbd_dev->watch_handle);
/*
* A watch request is set to linger, so the underlying osd
* request won't go away until we unregister it. We retain
* a pointer to the object request during that time (in
* rbd_dev->watch_request), so we'll keep a reference to it.
* We'll drop that reference after we've unregistered it in
* rbd_dev_header_unwatch_sync().
*/
rbd_dev->watch_request = obj_request;
handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
&rbd_dev->header_oloc, rbd_watch_cb,
rbd_watch_errcb, rbd_dev);
if (IS_ERR(handle))
return PTR_ERR(handle);
rbd_dev->watch_handle = handle;
return 0;
}
static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
{
struct rbd_obj_request *obj_request;
rbd_assert(rbd_dev->watch_event);
rbd_assert(rbd_dev->watch_request);
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
int ret;
rbd_obj_request_end(rbd_dev->watch_request);
rbd_obj_request_put(rbd_dev->watch_request);
rbd_dev->watch_request = NULL;
if (!rbd_dev->watch_handle)
return;
obj_request = rbd_obj_watch_request_helper(rbd_dev, false);
if (!IS_ERR(obj_request))
rbd_obj_request_put(obj_request);
else
rbd_warn(rbd_dev, "unable to tear down watch request (%ld)",
PTR_ERR(obj_request));
ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
if (ret)
rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
ceph_osdc_cancel_event(rbd_dev->watch_event);
rbd_dev->watch_event = NULL;
rbd_dev->watch_handle = NULL;
}
/*
......@@ -4081,6 +3982,7 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
init_rwsem(&rbd_dev->header_rwsem);
ceph_oid_init(&rbd_dev->header_oid);
ceph_oloc_init(&rbd_dev->header_oloc);
rbd_dev->dev.bus = &rbd_bus_type;
rbd_dev->dev.type = &rbd_device_type;
......@@ -5285,6 +5187,7 @@ static int rbd_dev_header_name(struct rbd_device *rbd_dev)
rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
rbd_dev->header_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
if (rbd_dev->image_format == 1)
ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
spec->image_name, RBD_SUFFIX);
......
......@@ -153,8 +153,9 @@ struct ceph_dir_layout {
/* watch-notify operations */
enum {
WATCH_NOTIFY = 1, /* notifying watcher */
WATCH_NOTIFY_COMPLETE = 2, /* notifier notified when done */
CEPH_WATCH_EVENT_NOTIFY = 1, /* notifying watcher */
CEPH_WATCH_EVENT_NOTIFY_COMPLETE = 2, /* notifier notified when done */
CEPH_WATCH_EVENT_DISCONNECT = 3, /* we were disconnected */
};
......
......@@ -34,7 +34,7 @@ struct ceph_osd {
struct rb_node o_node;
struct ceph_connection o_con;
struct rb_root o_requests;
struct list_head o_linger_requests;
struct rb_root o_linger_requests;
struct list_head o_osd_lru;
struct ceph_auth_handshake o_auth;
unsigned long lru_ttl;
......@@ -108,11 +108,12 @@ struct ceph_osd_req_op {
} cls;
struct {
u64 cookie;
u64 ver;
u32 prot_ver;
u32 timeout;
__u8 flag;
__u8 op; /* CEPH_OSD_WATCH_OP_ */
u32 gen;
} watch;
struct {
struct ceph_osd_data request_data;
} notify_ack;
struct {
u64 expected_object_size;
u64 expected_write_size;
......@@ -145,8 +146,6 @@ struct ceph_osd_request_target {
struct ceph_osd_request {
u64 r_tid; /* unique for this client */
struct rb_node r_node;
struct list_head r_linger_item;
struct list_head r_linger_osd_item;
struct ceph_osd *r_osd;
struct ceph_osd_request_target r_t;
......@@ -162,7 +161,6 @@ struct ceph_osd_request {
int r_result;
bool r_got_reply;
int r_linger;
struct ceph_osd_client *r_osdc;
struct kref r_kref;
......@@ -181,6 +179,7 @@ struct ceph_osd_request {
struct ceph_snap_context *r_snapc; /* for writes */
struct timespec r_mtime; /* ditto */
u64 r_data_offset; /* ditto */
bool r_linger; /* don't resend on failure */
/* internal */
unsigned long r_stamp; /* jiffies, send or check time */
......@@ -195,23 +194,40 @@ struct ceph_request_redirect {
struct ceph_object_locator oloc;
};
struct ceph_osd_event {
u64 cookie;
int one_shot;
typedef void (*rados_watchcb2_t)(void *arg, u64 notify_id, u64 cookie,
u64 notifier_id, void *data, size_t data_len);
typedef void (*rados_watcherrcb_t)(void *arg, u64 cookie, int err);
struct ceph_osd_linger_request {
struct ceph_osd_client *osdc;
void (*cb)(u64, u64, u8, void *);
void *data;
struct rb_node node;
struct list_head osd_node;
u64 linger_id;
bool committed;
struct ceph_osd *osd;
struct ceph_osd_request *reg_req;
struct ceph_osd_request *ping_req;
unsigned long ping_sent;
struct ceph_osd_request_target t;
u32 last_force_resend;
struct timespec mtime;
struct kref kref;
};
struct mutex lock;
struct rb_node node; /* osd */
struct rb_node osdc_node; /* osdc */
struct list_head scan_item;
struct completion reg_commit_wait;
int reg_commit_error;
int last_error;
u32 register_gen;
struct ceph_osd_event_work {
struct work_struct work;
struct ceph_osd_event *event;
u64 ver;
u64 notify_id;
u8 opcode;
rados_watchcb2_t wcb;
rados_watcherrcb_t errcb;
void *data;
};
struct ceph_osd_client {
......@@ -223,9 +239,10 @@ struct ceph_osd_client {
struct rb_root osds; /* osds */
struct list_head osd_lru; /* idle osds */
spinlock_t osd_lru_lock;
struct list_head req_linger; /* lingering requests */
struct ceph_osd homeless_osd;
atomic64_t last_tid; /* tid of last request */
u64 last_linger_id;
struct rb_root linger_requests; /* lingering requests */
atomic_t num_requests;
atomic_t num_homeless;
struct delayed_work timeout_work;
......@@ -239,10 +256,6 @@ struct ceph_osd_client {
struct ceph_msgpool msgpool_op;
struct ceph_msgpool msgpool_op_reply;
spinlock_t event_lock;
struct rb_root event_tree;
u64 event_count;
struct workqueue_struct *notify_wq;
};
......@@ -314,9 +327,6 @@ extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req,
extern int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
u16 opcode, const char *name, const void *value,
size_t size, u8 cmp_op, u8 cmp_mode);
extern void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
unsigned int which, u16 opcode,
u64 cookie, u64 version, int flag);
extern void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
unsigned int which,
u64 expected_object_size,
......@@ -339,9 +349,6 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
u32 truncate_seq, u64 truncate_size,
bool use_mempool);
extern void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
struct ceph_osd_request *req);
extern void ceph_osdc_get_request(struct ceph_osd_request *req);
extern void ceph_osdc_put_request(struct ceph_osd_request *req);
......@@ -372,11 +379,23 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
struct timespec *mtime,
struct page **pages, int nr_pages);
/* watch/notify events */
extern int ceph_osdc_create_event(struct ceph_osd_client *osdc,
void (*event_cb)(u64, u64, u8, void *),
void *data, struct ceph_osd_event **pevent);
extern void ceph_osdc_cancel_event(struct ceph_osd_event *event);
extern void ceph_osdc_put_event(struct ceph_osd_event *event);
/* watch/notify */
struct ceph_osd_linger_request *
ceph_osdc_watch(struct ceph_osd_client *osdc,
struct ceph_object_id *oid,
struct ceph_object_locator *oloc,
rados_watchcb2_t wcb,
rados_watcherrcb_t errcb,
void *data);
int ceph_osdc_unwatch(struct ceph_osd_client *osdc,
struct ceph_osd_linger_request *lreq);
int ceph_osdc_notify_ack(struct ceph_osd_client *osdc,
struct ceph_object_id *oid,
struct ceph_object_locator *oloc,
u64 notify_id,
u64 cookie,
void *payload,
size_t payload_len);
#endif
......@@ -427,7 +427,17 @@ enum {
CEPH_OSD_CMPXATTR_MODE_U64 = 2
};
#define RADOS_NOTIFY_VER 1
enum {
CEPH_OSD_WATCH_OP_UNWATCH = 0,
CEPH_OSD_WATCH_OP_LEGACY_WATCH = 1,
/* note: use only ODD ids to prevent pre-giant code from
interpreting the op as UNWATCH */
CEPH_OSD_WATCH_OP_WATCH = 3,
CEPH_OSD_WATCH_OP_RECONNECT = 5,
CEPH_OSD_WATCH_OP_PING = 7,
};
const char *ceph_osd_watch_op_name(int o);
/*
* an individual object operation. each may be accompanied by some data
......@@ -462,8 +472,9 @@ struct ceph_osd_op {
} __attribute__ ((packed)) snap;
struct {
__le64 cookie;
__le64 ver;
__u8 flag; /* 0 = unwatch, 1 = watch */
__le64 ver; /* no longer used */
__u8 op; /* CEPH_OSD_WATCH_OP_* */
__le32 gen; /* registration generation */
} __attribute__ ((packed)) watch;
struct {
__le64 offset, length;
......
......@@ -27,6 +27,22 @@ __CEPH_FORALL_OSD_OPS(GENERATE_CASE)
}
}
const char *ceph_osd_watch_op_name(int o)
{
switch (o) {
case CEPH_OSD_WATCH_OP_UNWATCH:
return "unwatch";
case CEPH_OSD_WATCH_OP_WATCH:
return "watch";
case CEPH_OSD_WATCH_OP_RECONNECT:
return "reconnect";
case CEPH_OSD_WATCH_OP_PING:
return "ping";
default:
return "???";
}
}
const char *ceph_osd_state_name(int s)
{
switch (s) {
......
......@@ -177,6 +177,9 @@ static void dump_request(struct seq_file *s, struct ceph_osd_request *req)
seq_printf(s, "%s%s", (i == 0 ? "\t" : ","),
ceph_osd_op_name(op->op));
if (op->op == CEPH_OSD_OP_WATCH)
seq_printf(s, "-%s",
ceph_osd_watch_op_name(op->watch.op));
}
seq_putc(s, '\n');
......@@ -197,6 +200,31 @@ static void dump_requests(struct seq_file *s, struct ceph_osd *osd)
mutex_unlock(&osd->lock);
}
static void dump_linger_request(struct seq_file *s,
struct ceph_osd_linger_request *lreq)
{
seq_printf(s, "%llu\t", lreq->linger_id);
dump_target(s, &lreq->t);
seq_printf(s, "\t%u\t%s/%d\n", lreq->register_gen,
lreq->committed ? "C" : "", lreq->last_error);
}
static void dump_linger_requests(struct seq_file *s, struct ceph_osd *osd)
{
struct rb_node *n;
mutex_lock(&osd->lock);
for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) {
struct ceph_osd_linger_request *lreq =
rb_entry(n, struct ceph_osd_linger_request, node);
dump_linger_request(s, lreq);
}
mutex_unlock(&osd->lock);
}
static int osdc_show(struct seq_file *s, void *pp)
{
struct ceph_client *client = s->private;
......@@ -214,6 +242,14 @@ static int osdc_show(struct seq_file *s, void *pp)
}
dump_requests(s, &osdc->homeless_osd);
seq_puts(s, "LINGER REQUESTS\n");
for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
dump_linger_requests(s, osd);
}
dump_linger_requests(s, &osdc->homeless_osd);
up_read(&osdc->lock);
return 0;
}
......
......@@ -45,6 +45,10 @@ static const struct ceph_connection_operations osd_con_ops;
static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req);
static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req);
static void link_linger(struct ceph_osd *osd,
struct ceph_osd_linger_request *lreq);
static void unlink_linger(struct ceph_osd *osd,
struct ceph_osd_linger_request *lreq);
#if 1
static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem)
......@@ -74,10 +78,15 @@ static inline void verify_osd_locked(struct ceph_osd *osd)
rwsem_is_locked(&osdc->lock)) &&
!rwsem_is_wrlocked(&osdc->lock));
}
static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq)
{
WARN_ON(!mutex_is_locked(&lreq->lock));
}
#else
static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { }
static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { }
static inline void verify_osd_locked(struct ceph_osd *osd) { }
static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) { }
#endif
/*
......@@ -322,6 +331,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
case CEPH_OSD_OP_STAT:
ceph_osd_data_release(&op->raw_data_in);
break;
case CEPH_OSD_OP_NOTIFY_ACK:
ceph_osd_data_release(&op->notify_ack.request_data);
break;
default:
break;
}
......@@ -345,6 +357,29 @@ static void target_init(struct ceph_osd_request_target *t)
t->osd = CEPH_HOMELESS_OSD;
}
static void target_copy(struct ceph_osd_request_target *dest,
const struct ceph_osd_request_target *src)
{
ceph_oid_copy(&dest->base_oid, &src->base_oid);
ceph_oloc_copy(&dest->base_oloc, &src->base_oloc);
ceph_oid_copy(&dest->target_oid, &src->target_oid);
ceph_oloc_copy(&dest->target_oloc, &src->target_oloc);
dest->pgid = src->pgid; /* struct */
dest->pg_num = src->pg_num;
dest->pg_num_mask = src->pg_num_mask;
ceph_osds_copy(&dest->acting, &src->acting);
ceph_osds_copy(&dest->up, &src->up);
dest->size = src->size;
dest->min_size = src->min_size;
dest->sort_bitwise = src->sort_bitwise;
dest->flags = src->flags;
dest->paused = src->paused;
dest->osd = src->osd;
}
static void target_destroy(struct ceph_osd_request_target *t)
{
ceph_oid_destroy(&t->base_oid);
......@@ -357,8 +392,6 @@ static void target_destroy(struct ceph_osd_request_target *t)
static void request_release_checks(struct ceph_osd_request *req)
{
WARN_ON(!RB_EMPTY_NODE(&req->r_node));
WARN_ON(!list_empty(&req->r_linger_item));
WARN_ON(!list_empty(&req->r_linger_osd_item));
WARN_ON(!list_empty(&req->r_unsafe_item));
WARN_ON(req->r_osd);
}
......@@ -419,13 +452,48 @@ static void request_init(struct ceph_osd_request *req)
init_completion(&req->r_completion);
init_completion(&req->r_safe_completion);
RB_CLEAR_NODE(&req->r_node);
INIT_LIST_HEAD(&req->r_linger_item);
INIT_LIST_HEAD(&req->r_linger_osd_item);
INIT_LIST_HEAD(&req->r_unsafe_item);
target_init(&req->r_t);
}
/*
* This is ugly, but it allows us to reuse linger registration and ping
* requests, keeping the structure of the code around send_linger{_ping}()
* reasonable. Setting up a min_nr=2 mempool for each linger request
* and dealing with copying ops (this blasts req only, watch op remains
* intact) isn't any better.
*/
static void request_reinit(struct ceph_osd_request *req)
{
struct ceph_osd_client *osdc = req->r_osdc;
bool mempool = req->r_mempool;
unsigned int num_ops = req->r_num_ops;
u64 snapid = req->r_snapid;
struct ceph_snap_context *snapc = req->r_snapc;
bool linger = req->r_linger;
struct ceph_msg *request_msg = req->r_request;
struct ceph_msg *reply_msg = req->r_reply;
dout("%s req %p\n", __func__, req);
WARN_ON(atomic_read(&req->r_kref.refcount) != 1);
request_release_checks(req);
WARN_ON(atomic_read(&request_msg->kref.refcount) != 1);
WARN_ON(atomic_read(&reply_msg->kref.refcount) != 1);
target_destroy(&req->r_t);
request_init(req);
req->r_osdc = osdc;
req->r_mempool = mempool;
req->r_num_ops = num_ops;
req->r_snapid = snapid;
req->r_snapc = snapc;
req->r_linger = linger;
req->r_request = request_msg;
req->r_reply = reply_msg;
}
struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
struct ceph_snap_context *snapc,
unsigned int num_ops,
......@@ -681,21 +749,19 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
}
EXPORT_SYMBOL(osd_req_op_xattr_init);
void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
unsigned int which, u16 opcode,
u64 cookie, u64 version, int flag)
/*
* @watch_opcode: CEPH_OSD_WATCH_OP_*
*/
static void osd_req_op_watch_init(struct ceph_osd_request *req, int which,
u64 cookie, u8 watch_opcode)
{
struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
opcode, 0);
BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH);
struct ceph_osd_req_op *op;
op = _osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0);
op->watch.cookie = cookie;
op->watch.ver = version;
if (opcode == CEPH_OSD_OP_WATCH && flag)
op->watch.flag = (u8)1;
op->watch.op = watch_opcode;
op->watch.gen = 0;
}
EXPORT_SYMBOL(osd_req_op_watch_init);
void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
unsigned int which,
......@@ -771,11 +837,13 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
break;
case CEPH_OSD_OP_STARTSYNC:
break;
case CEPH_OSD_OP_NOTIFY_ACK:
case CEPH_OSD_OP_WATCH:
dst->watch.cookie = cpu_to_le64(src->watch.cookie);
dst->watch.ver = cpu_to_le64(src->watch.ver);
dst->watch.flag = src->watch.flag;
dst->watch.ver = cpu_to_le64(0);
dst->watch.op = src->watch.op;
dst->watch.gen = cpu_to_le32(src->watch.gen);
break;
case CEPH_OSD_OP_NOTIFY_ACK:
break;
case CEPH_OSD_OP_SETALLOCHINT:
dst->alloc_hint.expected_object_size =
......@@ -915,7 +983,7 @@ static void osd_init(struct ceph_osd *osd)
atomic_set(&osd->o_ref, 1);
RB_CLEAR_NODE(&osd->o_node);
osd->o_requests = RB_ROOT;
INIT_LIST_HEAD(&osd->o_linger_requests);
osd->o_linger_requests = RB_ROOT;
INIT_LIST_HEAD(&osd->o_osd_lru);
INIT_LIST_HEAD(&osd->o_keepalive_item);
osd->o_incarnation = 1;
......@@ -926,7 +994,7 @@ static void osd_cleanup(struct ceph_osd *osd)
{
WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
WARN_ON(!list_empty(&osd->o_linger_requests));
WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
WARN_ON(!list_empty(&osd->o_osd_lru));
WARN_ON(!list_empty(&osd->o_keepalive_item));
......@@ -996,7 +1064,7 @@ static void __move_osd_to_lru(struct ceph_osd *osd)
static void maybe_move_osd_to_lru(struct ceph_osd *osd)
{
if (RB_EMPTY_ROOT(&osd->o_requests) &&
list_empty(&osd->o_linger_requests))
RB_EMPTY_ROOT(&osd->o_linger_requests))
__move_osd_to_lru(osd);
}
......@@ -1036,6 +1104,17 @@ static void close_osd(struct ceph_osd *osd)
unlink_request(osd, req);
link_request(&osdc->homeless_osd, req);
}
for (n = rb_first(&osd->o_linger_requests); n; ) {
struct ceph_osd_linger_request *lreq =
rb_entry(n, struct ceph_osd_linger_request, node);
n = rb_next(n); /* unlink_linger() */
dout(" reassigning lreq %p linger_id %llu\n", lreq,
lreq->linger_id);
unlink_linger(osd, lreq);
link_linger(&osdc->homeless_osd, lreq);
}
__remove_osd_from_lru(osd);
erase_osd(&osdc->osds, osd);
......@@ -1052,7 +1131,7 @@ static int reopen_osd(struct ceph_osd *osd)
dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
if (RB_EMPTY_ROOT(&osd->o_requests) &&
list_empty(&osd->o_linger_requests)) {
RB_EMPTY_ROOT(&osd->o_linger_requests)) {
close_osd(osd);
return -ENODEV;
}
......@@ -1148,52 +1227,6 @@ static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req)
atomic_dec(&osd->o_osdc->num_homeless);
}
static void __register_linger_request(struct ceph_osd *osd,
struct ceph_osd_request *req)
{
dout("%s %p tid %llu\n", __func__, req, req->r_tid);
WARN_ON(!req->r_linger);
ceph_osdc_get_request(req);
list_add_tail(&req->r_linger_item, &osd->o_osdc->req_linger);
list_add_tail(&req->r_linger_osd_item, &osd->o_linger_requests);
__remove_osd_from_lru(osd);
req->r_osd = osd;
}
static void __unregister_linger_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
{
WARN_ON(!req->r_linger);
if (list_empty(&req->r_linger_item)) {
dout("%s %p tid %llu not registered\n", __func__, req,
req->r_tid);
return;
}
dout("%s %p tid %llu\n", __func__, req, req->r_tid);
list_del_init(&req->r_linger_item);
if (req->r_osd) {
list_del_init(&req->r_linger_osd_item);
maybe_move_osd_to_lru(req->r_osd);
if (RB_EMPTY_ROOT(&req->r_osd->o_requests))
req->r_osd = NULL;
}
ceph_osdc_put_request(req);
}
void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
{
if (!req->r_linger) {
dout("set_request_linger %p\n", req);
req->r_linger = 1;
}
}
EXPORT_SYMBOL(ceph_osdc_set_request_linger);
static bool __pool_full(struct ceph_pg_pool_info *pi)
{
return pi->flags & CEPH_POOL_FLAG_FULL;
......@@ -1379,6 +1412,10 @@ static void setup_request_data(struct ceph_osd_request *req,
op->xattr.value_len);
ceph_osdc_msg_data_add(msg, &op->xattr.osd_data);
break;
case CEPH_OSD_OP_NOTIFY_ACK:
ceph_osdc_msg_data_add(msg,
&op->notify_ack.request_data);
break;
/* reply */
case CEPH_OSD_OP_STAT:
......@@ -1683,6 +1720,460 @@ static void cancel_request(struct ceph_osd_request *req)
finish_request(req);
}
/*
* lingering requests, watch/notify v2 infrastructure
*/
static void linger_release(struct kref *kref)
{
struct ceph_osd_linger_request *lreq =
container_of(kref, struct ceph_osd_linger_request, kref);
dout("%s lreq %p reg_req %p ping_req %p\n", __func__, lreq,
lreq->reg_req, lreq->ping_req);
WARN_ON(!RB_EMPTY_NODE(&lreq->node));
WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node));
WARN_ON(!list_empty(&lreq->scan_item));
WARN_ON(lreq->osd);
if (lreq->reg_req)
ceph_osdc_put_request(lreq->reg_req);
if (lreq->ping_req)
ceph_osdc_put_request(lreq->ping_req);
target_destroy(&lreq->t);
kfree(lreq);
}
static void linger_put(struct ceph_osd_linger_request *lreq)
{
if (lreq)
kref_put(&lreq->kref, linger_release);
}
static struct ceph_osd_linger_request *
linger_get(struct ceph_osd_linger_request *lreq)
{
kref_get(&lreq->kref);
return lreq;
}
static struct ceph_osd_linger_request *
linger_alloc(struct ceph_osd_client *osdc)
{
struct ceph_osd_linger_request *lreq;
lreq = kzalloc(sizeof(*lreq), GFP_NOIO);
if (!lreq)
return NULL;
kref_init(&lreq->kref);
mutex_init(&lreq->lock);
RB_CLEAR_NODE(&lreq->node);
RB_CLEAR_NODE(&lreq->osdc_node);
INIT_LIST_HEAD(&lreq->scan_item);
init_completion(&lreq->reg_commit_wait);
lreq->osdc = osdc;
target_init(&lreq->t);
dout("%s lreq %p\n", __func__, lreq);
return lreq;
}
DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node)
DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node)
/*
* Create linger request <-> OSD session relation.
*
* @lreq has to be registered, @osd may be homeless.
*/
static void link_linger(struct ceph_osd *osd,
struct ceph_osd_linger_request *lreq)
{
verify_osd_locked(osd);
WARN_ON(!lreq->linger_id || lreq->osd);
dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
osd->o_osd, lreq, lreq->linger_id);
if (!osd_homeless(osd))
__remove_osd_from_lru(osd);
else
atomic_inc(&osd->o_osdc->num_homeless);
get_osd(osd);
insert_linger(&osd->o_linger_requests, lreq);
lreq->osd = osd;
}
static void unlink_linger(struct ceph_osd *osd,
struct ceph_osd_linger_request *lreq)
{
verify_osd_locked(osd);
WARN_ON(lreq->osd != osd);
dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
osd->o_osd, lreq, lreq->linger_id);
lreq->osd = NULL;
erase_linger(&osd->o_linger_requests, lreq);
put_osd(osd);
if (!osd_homeless(osd))
maybe_move_osd_to_lru(osd);
else
atomic_dec(&osd->o_osdc->num_homeless);
}
static bool __linger_registered(struct ceph_osd_linger_request *lreq)
{
verify_osdc_locked(lreq->osdc);
return !RB_EMPTY_NODE(&lreq->osdc_node);
}
static bool linger_registered(struct ceph_osd_linger_request *lreq)
{
struct ceph_osd_client *osdc = lreq->osdc;
bool registered;
down_read(&osdc->lock);
registered = __linger_registered(lreq);
up_read(&osdc->lock);
return registered;
}
static void linger_register(struct ceph_osd_linger_request *lreq)
{
struct ceph_osd_client *osdc = lreq->osdc;
verify_osdc_wrlocked(osdc);
WARN_ON(lreq->linger_id);
linger_get(lreq);
lreq->linger_id = ++osdc->last_linger_id;
insert_linger_osdc(&osdc->linger_requests, lreq);
}
static void linger_unregister(struct ceph_osd_linger_request *lreq)
{
struct ceph_osd_client *osdc = lreq->osdc;
verify_osdc_wrlocked(osdc);
erase_linger_osdc(&osdc->linger_requests, lreq);
linger_put(lreq);
}
static void cancel_linger_request(struct ceph_osd_request *req)
{
struct ceph_osd_linger_request *lreq = req->r_priv;
WARN_ON(!req->r_linger);
cancel_request(req);
linger_put(lreq);
}
struct linger_work {
struct work_struct work;
struct ceph_osd_linger_request *lreq;
union {
struct {
u64 notify_id;
u64 notifier_id;
void *payload; /* points into @msg front */
size_t payload_len;
struct ceph_msg *msg; /* for ceph_msg_put() */
} notify;
struct {
int err;
} error;
};
};
static struct linger_work *lwork_alloc(struct ceph_osd_linger_request *lreq,
work_func_t workfn)
{
struct linger_work *lwork;
lwork = kzalloc(sizeof(*lwork), GFP_NOIO);
if (!lwork)
return NULL;
INIT_WORK(&lwork->work, workfn);
lwork->lreq = linger_get(lreq);
return lwork;
}
static void lwork_free(struct linger_work *lwork)
{
struct ceph_osd_linger_request *lreq = lwork->lreq;
linger_put(lreq);
kfree(lwork);
}
static void lwork_queue(struct linger_work *lwork)
{
struct ceph_osd_linger_request *lreq = lwork->lreq;
struct ceph_osd_client *osdc = lreq->osdc;
verify_lreq_locked(lreq);
queue_work(osdc->notify_wq, &lwork->work);
}
static void do_watch_notify(struct work_struct *w)
{
struct linger_work *lwork = container_of(w, struct linger_work, work);
struct ceph_osd_linger_request *lreq = lwork->lreq;
if (!linger_registered(lreq)) {
dout("%s lreq %p not registered\n", __func__, lreq);
goto out;
}
dout("%s lreq %p notify_id %llu notifier_id %llu payload_len %zu\n",
__func__, lreq, lwork->notify.notify_id, lwork->notify.notifier_id,
lwork->notify.payload_len);
lreq->wcb(lreq->data, lwork->notify.notify_id, lreq->linger_id,
lwork->notify.notifier_id, lwork->notify.payload,
lwork->notify.payload_len);
out:
ceph_msg_put(lwork->notify.msg);
lwork_free(lwork);
}
static void do_watch_error(struct work_struct *w)
{
struct linger_work *lwork = container_of(w, struct linger_work, work);
struct ceph_osd_linger_request *lreq = lwork->lreq;
if (!linger_registered(lreq)) {
dout("%s lreq %p not registered\n", __func__, lreq);
goto out;
}
dout("%s lreq %p err %d\n", __func__, lreq, lwork->error.err);
lreq->errcb(lreq->data, lreq->linger_id, lwork->error.err);
out:
lwork_free(lwork);
}
static void queue_watch_error(struct ceph_osd_linger_request *lreq)
{
struct linger_work *lwork;
lwork = lwork_alloc(lreq, do_watch_error);
if (!lwork) {
pr_err("failed to allocate error-lwork\n");
return;
}
lwork->error.err = lreq->last_error;
lwork_queue(lwork);
}
static void linger_reg_commit_complete(struct ceph_osd_linger_request *lreq,
int result)
{
if (!completion_done(&lreq->reg_commit_wait)) {
lreq->reg_commit_error = (result <= 0 ? result : 0);
complete_all(&lreq->reg_commit_wait);
}
}
static void linger_commit_cb(struct ceph_osd_request *req)
{
struct ceph_osd_linger_request *lreq = req->r_priv;
mutex_lock(&lreq->lock);
dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq,
lreq->linger_id, req->r_result);
WARN_ON(!__linger_registered(lreq));
linger_reg_commit_complete(lreq, req->r_result);
lreq->committed = true;
mutex_unlock(&lreq->lock);
linger_put(lreq);
}
static int normalize_watch_error(int err)
{
/*
* Translate ENOENT -> ENOTCONN so that a delete->disconnection
* notification and a failure to reconnect because we raced with
* the delete appear the same to the user.
*/
if (err == -ENOENT)
err = -ENOTCONN;
return err;
}
static void linger_reconnect_cb(struct ceph_osd_request *req)
{
struct ceph_osd_linger_request *lreq = req->r_priv;
mutex_lock(&lreq->lock);
dout("%s lreq %p linger_id %llu result %d last_error %d\n", __func__,
lreq, lreq->linger_id, req->r_result, lreq->last_error);
if (req->r_result < 0) {
if (!lreq->last_error) {
lreq->last_error = normalize_watch_error(req->r_result);
queue_watch_error(lreq);
}
}
mutex_unlock(&lreq->lock);
linger_put(lreq);
}
static void send_linger(struct ceph_osd_linger_request *lreq)
{
struct ceph_osd_request *req = lreq->reg_req;
struct ceph_osd_req_op *op = &req->r_ops[0];
verify_osdc_wrlocked(req->r_osdc);
dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
if (req->r_osd)
cancel_linger_request(req);
request_reinit(req);
ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
req->r_flags = lreq->t.flags;
req->r_mtime = lreq->mtime;
mutex_lock(&lreq->lock);
if (lreq->committed) {
WARN_ON(op->op != CEPH_OSD_OP_WATCH ||
op->watch.cookie != lreq->linger_id);
op->watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
op->watch.gen = ++lreq->register_gen;
dout("lreq %p reconnect register_gen %u\n", lreq,
op->watch.gen);
req->r_callback = linger_reconnect_cb;
} else {
WARN_ON(op->watch.op != CEPH_OSD_WATCH_OP_WATCH);
dout("lreq %p register\n", lreq);
req->r_callback = linger_commit_cb;
}
mutex_unlock(&lreq->lock);
req->r_priv = linger_get(lreq);
req->r_linger = true;
submit_request(req, true);
}
static void linger_ping_cb(struct ceph_osd_request *req)
{
struct ceph_osd_linger_request *lreq = req->r_priv;
mutex_lock(&lreq->lock);
dout("%s lreq %p linger_id %llu result %d ping_sent %lu last_error %d\n",
__func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent,
lreq->last_error);
if (lreq->register_gen == req->r_ops[0].watch.gen) {
if (req->r_result && !lreq->last_error) {
lreq->last_error = normalize_watch_error(req->r_result);
queue_watch_error(lreq);
}
} else {
dout("lreq %p register_gen %u ignoring old pong %u\n", lreq,
lreq->register_gen, req->r_ops[0].watch.gen);
}
mutex_unlock(&lreq->lock);
linger_put(lreq);
}
static void send_linger_ping(struct ceph_osd_linger_request *lreq)
{
struct ceph_osd_client *osdc = lreq->osdc;
struct ceph_osd_request *req = lreq->ping_req;
struct ceph_osd_req_op *op = &req->r_ops[0];
if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD)) {
dout("%s PAUSERD\n", __func__);
return;
}
lreq->ping_sent = jiffies;
dout("%s lreq %p linger_id %llu ping_sent %lu register_gen %u\n",
__func__, lreq, lreq->linger_id, lreq->ping_sent,
lreq->register_gen);
if (req->r_osd)
cancel_linger_request(req);
request_reinit(req);
target_copy(&req->r_t, &lreq->t);
WARN_ON(op->op != CEPH_OSD_OP_WATCH ||
op->watch.cookie != lreq->linger_id ||
op->watch.op != CEPH_OSD_WATCH_OP_PING);
op->watch.gen = lreq->register_gen;
req->r_callback = linger_ping_cb;
req->r_priv = linger_get(lreq);
req->r_linger = true;
ceph_osdc_get_request(req);
account_request(req);
req->r_tid = atomic64_inc_return(&osdc->last_tid);
link_request(lreq->osd, req);
send_request(req);
}
static void linger_submit(struct ceph_osd_linger_request *lreq)
{
struct ceph_osd_client *osdc = lreq->osdc;
struct ceph_osd *osd;
calc_target(osdc, &lreq->t, &lreq->last_force_resend, false);
osd = lookup_create_osd(osdc, lreq->t.osd, true);
link_linger(osd, lreq);
send_linger(lreq);
}
/*
* @lreq has to be both registered and linked.
*/
static void __linger_cancel(struct ceph_osd_linger_request *lreq)
{
if (lreq->ping_req->r_osd)
cancel_linger_request(lreq->ping_req);
if (lreq->reg_req->r_osd)
cancel_linger_request(lreq->reg_req);
unlink_linger(lreq->osd, lreq);
linger_unregister(lreq);
}
static void linger_cancel(struct ceph_osd_linger_request *lreq)
{
struct ceph_osd_client *osdc = lreq->osdc;
down_write(&osdc->lock);
if (__linger_registered(lreq))
__linger_cancel(lreq);
up_write(&osdc->lock);
}
static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq)
{
int ret;
dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
ret = wait_for_completion_interruptible(&lreq->reg_commit_wait);
return ret ?: lreq->reg_commit_error;
}
/*
* Timeout callback, called every N seconds. When 1 or more OSD
* requests has been active for more than N seconds, we send a keepalive
......@@ -1720,6 +2211,19 @@ static void handle_timeout(struct work_struct *work)
found = true;
}
}
for (p = rb_first(&osd->o_linger_requests); p; p = rb_next(p)) {
struct ceph_osd_linger_request *lreq =
rb_entry(p, struct ceph_osd_linger_request, node);
dout(" lreq %p linger_id %llu is served by osd%d\n",
lreq, lreq->linger_id, osd->o_osd);
found = true;
mutex_lock(&lreq->lock);
if (lreq->committed && !lreq->last_error)
send_linger_ping(lreq);
mutex_unlock(&lreq->lock);
}
if (found)
list_move_tail(&osd->o_keepalive_item, &slow_osds);
......@@ -1756,7 +2260,7 @@ static void handle_osds_timeout(struct work_struct *work)
break;
WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
WARN_ON(!list_empty(&osd->o_linger_requests));
WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
close_osd(osd);
}
......@@ -2082,7 +2586,8 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
__finish_request(req);
if (req->r_linger) {
WARN_ON(req->r_unsafe_callback);
__register_linger_request(osd, req);
dout("req %p tid %llu cb (locked)\n", req, req->r_tid);
__complete_request(req);
}
}
......@@ -2093,7 +2598,7 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
if (already_acked && req->r_unsafe_callback) {
dout("req %p tid %llu safe-cb\n", req, req->r_tid);
req->r_unsafe_callback(req, false);
} else {
} else if (!req->r_linger) {
dout("req %p tid %llu cb\n", req, req->r_tid);
__complete_request(req);
}
......@@ -2145,6 +2650,26 @@ static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id)
return pi->was_full && !__pool_full(pi);
}
static enum calc_target_result
recalc_linger_target(struct ceph_osd_linger_request *lreq)
{
struct ceph_osd_client *osdc = lreq->osdc;
enum calc_target_result ct_res;
ct_res = calc_target(osdc, &lreq->t, &lreq->last_force_resend, true);
if (ct_res == CALC_TARGET_NEED_RESEND) {
struct ceph_osd *osd;
osd = lookup_create_osd(osdc, lreq->t.osd, true);
if (osd != lreq->osd) {
unlink_linger(lreq->osd, lreq);
link_linger(osd, lreq);
}
}
return ct_res;
}
/*
* Requeue requests whose mapping to an OSD has changed.
*/
......@@ -2159,6 +2684,39 @@ static void scan_requests(struct ceph_osd *osd,
struct rb_node *n;
bool force_resend_writes;
for (n = rb_first(&osd->o_linger_requests); n; ) {
struct ceph_osd_linger_request *lreq =
rb_entry(n, struct ceph_osd_linger_request, node);
enum calc_target_result ct_res;
n = rb_next(n); /* recalc_linger_target() */
dout("%s lreq %p linger_id %llu\n", __func__, lreq,
lreq->linger_id);
ct_res = recalc_linger_target(lreq);
switch (ct_res) {
case CALC_TARGET_NO_ACTION:
force_resend_writes = cleared_full ||
(check_pool_cleared_full &&
pool_cleared_full(osdc, lreq->t.base_oloc.pool));
if (!force_resend && !force_resend_writes)
break;
/* fall through */
case CALC_TARGET_NEED_RESEND:
/*
* scan_requests() for the previous epoch(s)
* may have already added it to the list, since
* it's not unlinked here.
*/
if (list_empty(&lreq->scan_item))
list_add_tail(&lreq->scan_item, need_resend_linger);
break;
case CALC_TARGET_POOL_DNE:
break;
}
}
for (n = rb_first(&osd->o_requests); n; ) {
struct ceph_osd_request *req =
rb_entry(n, struct ceph_osd_request, r_node);
......@@ -2263,6 +2821,7 @@ static void kick_requests(struct ceph_osd_client *osdc,
struct rb_root *need_resend,
struct list_head *need_resend_linger)
{
struct ceph_osd_linger_request *lreq, *nlreq;
struct rb_node *n;
for (n = rb_first(need_resend); n; ) {
......@@ -2280,8 +2839,17 @@ static void kick_requests(struct ceph_osd_client *osdc,
if (!req->r_linger) {
if (!osd_homeless(osd) && !req->r_t.paused)
send_request(req);
} else {
cancel_linger_request(req);
}
}
list_for_each_entry_safe(lreq, nlreq, need_resend_linger, scan_item) {
if (!osd_homeless(lreq->osd))
send_linger(lreq);
list_del_init(&lreq->scan_item);
}
}
/*
......@@ -2406,15 +2974,25 @@ static void kick_osd_requests(struct ceph_osd *osd)
{
struct rb_node *n;
for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
for (n = rb_first(&osd->o_requests); n; ) {
struct ceph_osd_request *req =
rb_entry(n, struct ceph_osd_request, r_node);
n = rb_next(n); /* cancel_linger_request() */
if (!req->r_linger) {
if (!req->r_t.paused)
send_request(req);
} else {
cancel_linger_request(req);
}
}
for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) {
struct ceph_osd_linger_request *lreq =
rb_entry(n, struct ceph_osd_linger_request, node);
send_linger(lreq);
}
}
/*
......@@ -2441,193 +3019,77 @@ static void osd_fault(struct ceph_connection *con)
up_write(&osdc->lock);
}
/*
* watch/notify callback event infrastructure
*
* These callbacks are used both for watch and notify operations.
*/
static void __release_event(struct kref *kref)
{
struct ceph_osd_event *event =
container_of(kref, struct ceph_osd_event, kref);
dout("__release_event %p\n", event);
kfree(event);
}
static void get_event(struct ceph_osd_event *event)
{
kref_get(&event->kref);
}
void ceph_osdc_put_event(struct ceph_osd_event *event)
{
kref_put(&event->kref, __release_event);
}
EXPORT_SYMBOL(ceph_osdc_put_event);
static void __insert_event(struct ceph_osd_client *osdc,
struct ceph_osd_event *new)
{
struct rb_node **p = &osdc->event_tree.rb_node;
struct rb_node *parent = NULL;
struct ceph_osd_event *event = NULL;
while (*p) {
parent = *p;
event = rb_entry(parent, struct ceph_osd_event, node);
if (new->cookie < event->cookie)
p = &(*p)->rb_left;
else if (new->cookie > event->cookie)
p = &(*p)->rb_right;
else
BUG();
}
rb_link_node(&new->node, parent, p);
rb_insert_color(&new->node, &osdc->event_tree);
}
static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
u64 cookie)
{
struct rb_node **p = &osdc->event_tree.rb_node;
struct rb_node *parent = NULL;
struct ceph_osd_event *event = NULL;
while (*p) {
parent = *p;
event = rb_entry(parent, struct ceph_osd_event, node);
if (cookie < event->cookie)
p = &(*p)->rb_left;
else if (cookie > event->cookie)
p = &(*p)->rb_right;
else
return event;
}
return NULL;
}
static void __remove_event(struct ceph_osd_event *event)
{
struct ceph_osd_client *osdc = event->osdc;
if (!RB_EMPTY_NODE(&event->node)) {
dout("__remove_event removed %p\n", event);
rb_erase(&event->node, &osdc->event_tree);
ceph_osdc_put_event(event);
} else {
dout("__remove_event didn't remove %p\n", event);
}
}
int ceph_osdc_create_event(struct ceph_osd_client *osdc,
void (*event_cb)(u64, u64, u8, void *),
void *data, struct ceph_osd_event **pevent)
{
struct ceph_osd_event *event;
event = kmalloc(sizeof(*event), GFP_NOIO);
if (!event)
return -ENOMEM;
dout("create_event %p\n", event);
event->cb = event_cb;
event->one_shot = 0;
event->data = data;
event->osdc = osdc;
INIT_LIST_HEAD(&event->osd_node);
RB_CLEAR_NODE(&event->node);
kref_init(&event->kref); /* one ref for us */
kref_get(&event->kref); /* one ref for the caller */
spin_lock(&osdc->event_lock);
event->cookie = ++osdc->event_count;
__insert_event(osdc, event);
spin_unlock(&osdc->event_lock);
*pevent = event;
return 0;
}
EXPORT_SYMBOL(ceph_osdc_create_event);
void ceph_osdc_cancel_event(struct ceph_osd_event *event)
{
struct ceph_osd_client *osdc = event->osdc;
dout("cancel_event %p\n", event);
spin_lock(&osdc->event_lock);
__remove_event(event);
spin_unlock(&osdc->event_lock);
ceph_osdc_put_event(event); /* caller's */
}
EXPORT_SYMBOL(ceph_osdc_cancel_event);
static void do_event_work(struct work_struct *work)
{
struct ceph_osd_event_work *event_work =
container_of(work, struct ceph_osd_event_work, work);
struct ceph_osd_event *event = event_work->event;
u64 ver = event_work->ver;
u64 notify_id = event_work->notify_id;
u8 opcode = event_work->opcode;
dout("do_event_work completing %p\n", event);
event->cb(ver, notify_id, opcode, event->data);
dout("do_event_work completed %p\n", event);
ceph_osdc_put_event(event);
kfree(event_work);
}
/*
* Process osd watch notifications
*/
static void handle_watch_notify(struct ceph_osd_client *osdc,
struct ceph_msg *msg)
{
void *p, *end;
u8 proto_ver;
u64 cookie, ver, notify_id;
u8 opcode;
struct ceph_osd_event *event;
struct ceph_osd_event_work *event_work;
p = msg->front.iov_base;
end = p + msg->front.iov_len;
void *p = msg->front.iov_base;
void *const end = p + msg->front.iov_len;
struct ceph_osd_linger_request *lreq;
struct linger_work *lwork;
u8 proto_ver, opcode;
u64 cookie, notify_id;
u64 notifier_id = 0;
void *payload = NULL;
u32 payload_len = 0;
ceph_decode_8_safe(&p, end, proto_ver, bad);
ceph_decode_8_safe(&p, end, opcode, bad);
ceph_decode_64_safe(&p, end, cookie, bad);
ceph_decode_64_safe(&p, end, ver, bad);
p += 8; /* skip ver */
ceph_decode_64_safe(&p, end, notify_id, bad);
spin_lock(&osdc->event_lock);
event = __find_event(osdc, cookie);
if (event) {
BUG_ON(event->one_shot);
get_event(event);
}
spin_unlock(&osdc->event_lock);
dout("handle_watch_notify cookie %lld ver %lld event %p\n",
cookie, ver, event);
if (event) {
event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
if (!event_work) {
pr_err("couldn't allocate event_work\n");
ceph_osdc_put_event(event);
return;
if (proto_ver >= 1) {
ceph_decode_32_safe(&p, end, payload_len, bad);
ceph_decode_need(&p, end, payload_len, bad);
payload = p;
p += payload_len;
}
if (le16_to_cpu(msg->hdr.version) >= 2)
p += 4; /* skip return_code */
if (le16_to_cpu(msg->hdr.version) >= 3)
ceph_decode_64_safe(&p, end, notifier_id, bad);
down_read(&osdc->lock);
lreq = lookup_linger_osdc(&osdc->linger_requests, cookie);
if (!lreq) {
dout("%s opcode %d cookie %llu dne\n", __func__, opcode,
cookie);
goto out_unlock_osdc;
}
mutex_lock(&lreq->lock);
dout("%s opcode %d cookie %llu lreq %p\n", __func__, opcode, cookie,
lreq);
if (opcode == CEPH_WATCH_EVENT_DISCONNECT) {
if (!lreq->last_error) {
lreq->last_error = -ENOTCONN;
queue_watch_error(lreq);
}
} else {
/* CEPH_WATCH_EVENT_NOTIFY */
lwork = lwork_alloc(lreq, do_watch_notify);
if (!lwork) {
pr_err("failed to allocate notify-lwork\n");
goto out_unlock_lreq;
}
INIT_WORK(&event_work->work, do_event_work);
event_work->event = event;
event_work->ver = ver;
event_work->notify_id = notify_id;
event_work->opcode = opcode;
queue_work(osdc->notify_wq, &event_work->work);
lwork->notify.notify_id = notify_id;
lwork->notify.notifier_id = notifier_id;
lwork->notify.payload = payload;
lwork->notify.payload_len = payload_len;
lwork->notify.msg = ceph_msg_get(msg);
lwork_queue(lwork);
}
out_unlock_lreq:
mutex_unlock(&lreq->lock);
out_unlock_osdc:
up_read(&osdc->lock);
return;
bad:
......@@ -2659,8 +3121,6 @@ void ceph_osdc_cancel_request(struct ceph_osd_request *req)
struct ceph_osd_client *osdc = req->r_osdc;
down_write(&osdc->lock);
if (req->r_linger)
__unregister_linger_request(osdc, req);
if (req->r_osd)
cancel_request(req);
up_write(&osdc->lock);
......@@ -2743,6 +3203,198 @@ void ceph_osdc_sync(struct ceph_osd_client *osdc)
}
EXPORT_SYMBOL(ceph_osdc_sync);
static struct ceph_osd_request *
alloc_linger_request(struct ceph_osd_linger_request *lreq)
{
struct ceph_osd_request *req;
req = ceph_osdc_alloc_request(lreq->osdc, NULL, 1, false, GFP_NOIO);
if (!req)
return NULL;
ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
if (ceph_osdc_alloc_messages(req, GFP_NOIO)) {
ceph_osdc_put_request(req);
return NULL;
}
return req;
}
/*
* Returns a handle, caller owns a ref.
*/
struct ceph_osd_linger_request *
ceph_osdc_watch(struct ceph_osd_client *osdc,
struct ceph_object_id *oid,
struct ceph_object_locator *oloc,
rados_watchcb2_t wcb,
rados_watcherrcb_t errcb,
void *data)
{
struct ceph_osd_linger_request *lreq;
int ret;
lreq = linger_alloc(osdc);
if (!lreq)
return ERR_PTR(-ENOMEM);
lreq->wcb = wcb;
lreq->errcb = errcb;
lreq->data = data;
ceph_oid_copy(&lreq->t.base_oid, oid);
ceph_oloc_copy(&lreq->t.base_oloc, oloc);
lreq->t.flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
lreq->mtime = CURRENT_TIME;
lreq->reg_req = alloc_linger_request(lreq);
if (!lreq->reg_req) {
ret = -ENOMEM;
goto err_put_lreq;
}
lreq->ping_req = alloc_linger_request(lreq);
if (!lreq->ping_req) {
ret = -ENOMEM;
goto err_put_lreq;
}
down_write(&osdc->lock);
linger_register(lreq); /* before osd_req_op_* */
osd_req_op_watch_init(lreq->reg_req, 0, lreq->linger_id,
CEPH_OSD_WATCH_OP_WATCH);
osd_req_op_watch_init(lreq->ping_req, 0, lreq->linger_id,
CEPH_OSD_WATCH_OP_PING);
linger_submit(lreq);
up_write(&osdc->lock);
ret = linger_reg_commit_wait(lreq);
if (ret) {
linger_cancel(lreq);
goto err_put_lreq;
}
return lreq;
err_put_lreq:
linger_put(lreq);
return ERR_PTR(ret);
}
EXPORT_SYMBOL(ceph_osdc_watch);
/*
* Releases a ref.
*
* Times out after mount_timeout to preserve rbd unmap behaviour
* introduced in 2894e1d76974 ("rbd: timeout watch teardown on unmap
* with mount_timeout").
*/
int ceph_osdc_unwatch(struct ceph_osd_client *osdc,
struct ceph_osd_linger_request *lreq)
{
struct ceph_options *opts = osdc->client->options;
struct ceph_osd_request *req;
int ret;
req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
if (!req)
return -ENOMEM;
ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
req->r_mtime = CURRENT_TIME;
osd_req_op_watch_init(req, 0, lreq->linger_id,
CEPH_OSD_WATCH_OP_UNWATCH);
ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
if (ret)
goto out_put_req;
ceph_osdc_start_request(osdc, req, false);
linger_cancel(lreq);
linger_put(lreq);
ret = wait_request_timeout(req, opts->mount_timeout);
out_put_req:
ceph_osdc_put_request(req);
return ret;
}
EXPORT_SYMBOL(ceph_osdc_unwatch);
static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
u64 notify_id, u64 cookie, void *payload,
size_t payload_len)
{
struct ceph_osd_req_op *op;
struct ceph_pagelist *pl;
int ret;
op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
pl = kmalloc(sizeof(*pl), GFP_NOIO);
if (!pl)
return -ENOMEM;
ceph_pagelist_init(pl);
ret = ceph_pagelist_encode_64(pl, notify_id);
ret |= ceph_pagelist_encode_64(pl, cookie);
if (payload) {
ret |= ceph_pagelist_encode_32(pl, payload_len);
ret |= ceph_pagelist_append(pl, payload, payload_len);
} else {
ret |= ceph_pagelist_encode_32(pl, 0);
}
if (ret) {
ceph_pagelist_release(pl);
return -ENOMEM;
}
ceph_osd_data_pagelist_init(&op->notify_ack.request_data, pl);
op->indata_len = pl->length;
return 0;
}
int ceph_osdc_notify_ack(struct ceph_osd_client *osdc,
struct ceph_object_id *oid,
struct ceph_object_locator *oloc,
u64 notify_id,
u64 cookie,
void *payload,
size_t payload_len)
{
struct ceph_osd_request *req;
int ret;
req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
if (!req)
return -ENOMEM;
ceph_oid_copy(&req->r_base_oid, oid);
ceph_oloc_copy(&req->r_base_oloc, oloc);
req->r_flags = CEPH_OSD_FLAG_READ;
ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
if (ret)
goto out_put_req;
ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
payload_len);
if (ret)
goto out_put_req;
ceph_osdc_start_request(osdc, req, false);
ret = ceph_osdc_wait_request(osdc, req);
out_put_req:
ceph_osdc_put_request(req);
return ret;
}
EXPORT_SYMBOL(ceph_osdc_notify_ack);
/*
* Call all pending notify callbacks - for use after a watch is
* unregistered, to make sure no more callbacks for it will be invoked
......@@ -2767,15 +3419,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
osdc->osds = RB_ROOT;
INIT_LIST_HEAD(&osdc->osd_lru);
spin_lock_init(&osdc->osd_lru_lock);
INIT_LIST_HEAD(&osdc->req_linger);
osd_init(&osdc->homeless_osd);
osdc->homeless_osd.o_osdc = osdc;
osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD;
osdc->linger_requests = RB_ROOT;
INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
spin_lock_init(&osdc->event_lock);
osdc->event_tree = RB_ROOT;
osdc->event_count = 0;
err = -ENOMEM;
osdc->osdmap = ceph_osdmap_alloc();
......@@ -2838,6 +3487,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
osd_cleanup(&osdc->homeless_osd);
WARN_ON(!list_empty(&osdc->osd_lru));
WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests));
WARN_ON(atomic_read(&osdc->num_requests));
WARN_ON(atomic_read(&osdc->num_homeless));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment